001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.AtomicReference; 044 045import org.apache.activemq.broker.ConnectionContext; 046import org.apache.activemq.broker.region.BaseDestination; 047import org.apache.activemq.broker.scheduler.JobSchedulerStore; 048import org.apache.activemq.command.ActiveMQDestination; 049import org.apache.activemq.command.ActiveMQQueue; 050import org.apache.activemq.command.ActiveMQTempQueue; 051import org.apache.activemq.command.ActiveMQTempTopic; 052import org.apache.activemq.command.ActiveMQTopic; 053import org.apache.activemq.command.Message; 054import org.apache.activemq.command.MessageAck; 055import org.apache.activemq.command.MessageId; 056import org.apache.activemq.command.ProducerId; 057import org.apache.activemq.command.SubscriptionInfo; 058import org.apache.activemq.command.TransactionId; 059import org.apache.activemq.openwire.OpenWireFormat; 060import org.apache.activemq.protobuf.Buffer; 061import org.apache.activemq.store.AbstractMessageStore; 062import org.apache.activemq.store.IndexListener; 063import org.apache.activemq.store.ListenableFuture; 064import org.apache.activemq.store.MessageRecoveryListener; 065import org.apache.activemq.store.MessageStore; 066import org.apache.activemq.store.MessageStoreStatistics; 067import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 068import org.apache.activemq.store.NoLocalSubscriptionAware; 069import org.apache.activemq.store.PersistenceAdapter; 070import org.apache.activemq.store.TopicMessageStore; 071import org.apache.activemq.store.TransactionIdTransformer; 072import org.apache.activemq.store.TransactionStore; 073import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 074import org.apache.activemq.store.kahadb.data.KahaDestination; 075import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 076import org.apache.activemq.store.kahadb.data.KahaLocation; 077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 079import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 080import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 081import org.apache.activemq.store.kahadb.disk.journal.Location; 082import org.apache.activemq.store.kahadb.disk.page.Transaction; 083import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 084import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 085import org.apache.activemq.usage.MemoryUsage; 086import org.apache.activemq.usage.SystemUsage; 087import org.apache.activemq.util.IOExceptionSupport; 088import org.apache.activemq.util.ServiceStopper; 089import org.apache.activemq.util.ThreadPoolUtils; 090import org.apache.activemq.wireformat.WireFormat; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware { 095 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 096 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 097 098 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 099 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 100 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 101 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 102 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 103 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 104 105 protected ExecutorService queueExecutor; 106 protected ExecutorService topicExecutor; 107 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 108 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 109 final WireFormat wireFormat = new OpenWireFormat(); 110 private SystemUsage usageManager; 111 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 112 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 113 Semaphore globalQueueSemaphore; 114 Semaphore globalTopicSemaphore; 115 private boolean concurrentStoreAndDispatchQueues = true; 116 // when true, message order may be compromised when cache is exhausted if store is out 117 // or order w.r.t cache 118 private boolean concurrentStoreAndDispatchTopics = false; 119 private final boolean concurrentStoreAndDispatchTransactions = false; 120 private int maxAsyncJobs = MAX_ASYNC_JOBS; 121 private final KahaDBTransactionStore transactionStore; 122 private TransactionIdTransformer transactionIdTransformer; 123 124 public KahaDBStore() { 125 this.transactionStore = new KahaDBTransactionStore(this); 126 this.transactionIdTransformer = new TransactionIdTransformer() { 127 @Override 128 public TransactionId transform(TransactionId txid) { 129 return txid; 130 } 131 }; 132 } 133 134 @Override 135 public String toString() { 136 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 137 } 138 139 @Override 140 public void setBrokerName(String brokerName) { 141 } 142 143 @Override 144 public void setUsageManager(SystemUsage usageManager) { 145 this.usageManager = usageManager; 146 } 147 148 public SystemUsage getUsageManager() { 149 return this.usageManager; 150 } 151 152 /** 153 * @return the concurrentStoreAndDispatch 154 */ 155 public boolean isConcurrentStoreAndDispatchQueues() { 156 return this.concurrentStoreAndDispatchQueues; 157 } 158 159 /** 160 * @param concurrentStoreAndDispatch 161 * the concurrentStoreAndDispatch to set 162 */ 163 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 164 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 165 } 166 167 /** 168 * @return the concurrentStoreAndDispatch 169 */ 170 public boolean isConcurrentStoreAndDispatchTopics() { 171 return this.concurrentStoreAndDispatchTopics; 172 } 173 174 /** 175 * @param concurrentStoreAndDispatch 176 * the concurrentStoreAndDispatch to set 177 */ 178 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 179 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 180 } 181 182 public boolean isConcurrentStoreAndDispatchTransactions() { 183 return this.concurrentStoreAndDispatchTransactions; 184 } 185 186 /** 187 * @return the maxAsyncJobs 188 */ 189 public int getMaxAsyncJobs() { 190 return this.maxAsyncJobs; 191 } 192 193 /** 194 * @param maxAsyncJobs 195 * the maxAsyncJobs to set 196 */ 197 public void setMaxAsyncJobs(int maxAsyncJobs) { 198 this.maxAsyncJobs = maxAsyncJobs; 199 } 200 201 202 @Override 203 protected void configureMetadata() { 204 if (brokerService != null) { 205 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 206 wireFormat.setVersion(metadata.openwireVersion); 207 208 if (LOG.isDebugEnabled()) { 209 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 210 } 211 212 } 213 } 214 215 @Override 216 public void doStart() throws Exception { 217 //configure the metadata before start, right now 218 //this is just the open wire version 219 configureMetadata(); 220 221 super.doStart(); 222 223 if (brokerService != null) { 224 // In case the recovered store used a different OpenWire version log a warning 225 // to assist in determining why journal reads fail. 226 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 227 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 228 "than the version configured[{}] reverting to the version " + 229 "used by this store, some newer broker features may not work" + 230 "as expected.", 231 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 232 233 // Update the broker service instance to the actual version in use. 234 wireFormat.setVersion(metadata.openwireVersion); 235 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 236 } 237 } 238 239 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 240 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 241 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 242 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 243 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 244 asyncQueueJobQueue, new ThreadFactory() { 245 @Override 246 public Thread newThread(Runnable runnable) { 247 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 248 thread.setDaemon(true); 249 return thread; 250 } 251 }); 252 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 253 asyncTopicJobQueue, new ThreadFactory() { 254 @Override 255 public Thread newThread(Runnable runnable) { 256 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 257 thread.setDaemon(true); 258 return thread; 259 } 260 }); 261 } 262 263 @Override 264 public void doStop(ServiceStopper stopper) throws Exception { 265 // drain down async jobs 266 LOG.info("Stopping async queue tasks"); 267 if (this.globalQueueSemaphore != null) { 268 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 269 } 270 synchronized (this.asyncQueueMaps) { 271 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 272 synchronized (m) { 273 for (StoreTask task : m.values()) { 274 task.cancel(); 275 } 276 } 277 } 278 this.asyncQueueMaps.clear(); 279 } 280 LOG.info("Stopping async topic tasks"); 281 if (this.globalTopicSemaphore != null) { 282 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 283 } 284 synchronized (this.asyncTopicMaps) { 285 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 286 synchronized (m) { 287 for (StoreTask task : m.values()) { 288 task.cancel(); 289 } 290 } 291 } 292 this.asyncTopicMaps.clear(); 293 } 294 if (this.globalQueueSemaphore != null) { 295 this.globalQueueSemaphore.drainPermits(); 296 } 297 if (this.globalTopicSemaphore != null) { 298 this.globalTopicSemaphore.drainPermits(); 299 } 300 if (this.queueExecutor != null) { 301 ThreadPoolUtils.shutdownNow(queueExecutor); 302 queueExecutor = null; 303 } 304 if (this.topicExecutor != null) { 305 ThreadPoolUtils.shutdownNow(topicExecutor); 306 topicExecutor = null; 307 } 308 LOG.info("Stopped KahaDB"); 309 super.doStop(stopper); 310 } 311 312 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 313 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 314 @Override 315 public Location execute(Transaction tx) throws IOException { 316 StoredDestination sd = getStoredDestination(destination, tx); 317 Long sequence = sd.messageIdIndex.get(tx, key); 318 if (sequence == null) { 319 return null; 320 } 321 return sd.orderIndex.get(tx, sequence).location; 322 } 323 }); 324 } 325 326 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 327 StoreQueueTask task = null; 328 synchronized (store.asyncTaskMap) { 329 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 330 } 331 return task; 332 } 333 334 // with asyncTaskMap locked 335 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 336 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 337 this.queueExecutor.execute(task); 338 } 339 340 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 341 StoreTopicTask task = null; 342 synchronized (store.asyncTaskMap) { 343 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 344 } 345 return task; 346 } 347 348 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 349 synchronized (store.asyncTaskMap) { 350 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 351 } 352 this.topicExecutor.execute(task); 353 } 354 355 @Override 356 public TransactionStore createTransactionStore() throws IOException { 357 return this.transactionStore; 358 } 359 360 public boolean getForceRecoverIndex() { 361 return this.forceRecoverIndex; 362 } 363 364 public void setForceRecoverIndex(boolean forceRecoverIndex) { 365 this.forceRecoverIndex = forceRecoverIndex; 366 } 367 368 public class KahaDBMessageStore extends AbstractMessageStore { 369 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 370 protected KahaDestination dest; 371 private final int maxAsyncJobs; 372 private final Semaphore localDestinationSemaphore; 373 374 double doneTasks, canceledTasks = 0; 375 376 public KahaDBMessageStore(ActiveMQDestination destination) { 377 super(destination); 378 this.dest = convert(destination); 379 this.maxAsyncJobs = getMaxAsyncJobs(); 380 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 381 } 382 383 @Override 384 public ActiveMQDestination getDestination() { 385 return destination; 386 } 387 388 @Override 389 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 390 throws IOException { 391 if (isConcurrentStoreAndDispatchQueues()) { 392 message.beforeMarshall(wireFormat); 393 StoreQueueTask result = new StoreQueueTask(this, context, message); 394 ListenableFuture<Object> future = result.getFuture(); 395 message.getMessageId().setFutureOrSequenceLong(future); 396 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 397 result.aquireLocks(); 398 synchronized (asyncTaskMap) { 399 addQueueTask(this, result); 400 if (indexListener != null) { 401 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 402 } 403 } 404 return future; 405 } else { 406 return super.asyncAddQueueMessage(context, message); 407 } 408 } 409 410 @Override 411 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 412 if (isConcurrentStoreAndDispatchQueues()) { 413 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 414 StoreQueueTask task = null; 415 synchronized (asyncTaskMap) { 416 task = (StoreQueueTask) asyncTaskMap.get(key); 417 } 418 if (task != null) { 419 if (ack.isInTransaction() || !task.cancel()) { 420 try { 421 task.future.get(); 422 } catch (InterruptedException e) { 423 throw new InterruptedIOException(e.toString()); 424 } catch (Exception ignored) { 425 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 426 } 427 removeMessage(context, ack); 428 } else { 429 indexLock.writeLock().lock(); 430 try { 431 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 432 } finally { 433 indexLock.writeLock().unlock(); 434 } 435 synchronized (asyncTaskMap) { 436 asyncTaskMap.remove(key); 437 } 438 } 439 } else { 440 removeMessage(context, ack); 441 } 442 } else { 443 removeMessage(context, ack); 444 } 445 } 446 447 @Override 448 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 449 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 450 command.setDestination(dest); 451 command.setMessageId(message.getMessageId().toProducerKey()); 452 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 453 command.setPriority(message.getPriority()); 454 command.setPrioritySupported(isPrioritizedMessages()); 455 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 456 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 457 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 458 // sync add? (for async, future present from getFutureOrSequenceLong) 459 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 460 461 @Override 462 public void sequenceAssignedWithIndexLocked(final long sequence) { 463 message.getMessageId().setFutureOrSequenceLong(sequence); 464 if (indexListener != null) { 465 if (possibleFuture == null) { 466 trackPendingAdd(dest, sequence); 467 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 468 @Override 469 public void run() { 470 trackPendingAddComplete(dest, sequence); 471 } 472 })); 473 } 474 } 475 } 476 }, null); 477 } 478 479 @Override 480 public void updateMessage(Message message) throws IOException { 481 if (LOG.isTraceEnabled()) { 482 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 483 } 484 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 485 KahaAddMessageCommand command = new KahaAddMessageCommand(); 486 command.setDestination(dest); 487 command.setMessageId(message.getMessageId().toProducerKey()); 488 command.setPriority(message.getPriority()); 489 command.setPrioritySupported(prioritizedMessages); 490 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 491 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 492 updateMessageCommand.setMessage(command); 493 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 494 } 495 496 @Override 497 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 498 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 499 command.setDestination(dest); 500 command.setMessageId(ack.getLastMessageId().toProducerKey()); 501 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 502 503 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 504 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 505 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 506 } 507 508 @Override 509 public void removeAllMessages(ConnectionContext context) throws IOException { 510 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 511 command.setDestination(dest); 512 store(command, true, null, null); 513 } 514 515 @Override 516 public Message getMessage(MessageId identity) throws IOException { 517 final String key = identity.toProducerKey(); 518 519 // Hopefully one day the page file supports concurrent read 520 // operations... but for now we must 521 // externally synchronize... 522 Location location; 523 indexLock.writeLock().lock(); 524 try { 525 location = findMessageLocation(key, dest); 526 } finally { 527 indexLock.writeLock().unlock(); 528 } 529 if (location == null) { 530 return null; 531 } 532 533 return loadMessage(location); 534 } 535 536 @Override 537 public boolean isEmpty() throws IOException { 538 indexLock.writeLock().lock(); 539 try { 540 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 541 @Override 542 public Boolean execute(Transaction tx) throws IOException { 543 // Iterate through all index entries to get a count of 544 // messages in the destination. 545 StoredDestination sd = getStoredDestination(dest, tx); 546 return sd.locationIndex.isEmpty(tx); 547 } 548 }); 549 } finally { 550 indexLock.writeLock().unlock(); 551 } 552 } 553 554 @Override 555 public void recover(final MessageRecoveryListener listener) throws Exception { 556 // recovery may involve expiry which will modify 557 indexLock.writeLock().lock(); 558 try { 559 pageFile.tx().execute(new Transaction.Closure<Exception>() { 560 @Override 561 public void execute(Transaction tx) throws Exception { 562 StoredDestination sd = getStoredDestination(dest, tx); 563 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 564 sd.orderIndex.resetCursorPosition(); 565 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 566 .hasNext(); ) { 567 Entry<Long, MessageKeys> entry = iterator.next(); 568 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 569 continue; 570 } 571 Message msg = loadMessage(entry.getValue().location); 572 listener.recoverMessage(msg); 573 } 574 } 575 }); 576 } finally { 577 indexLock.writeLock().unlock(); 578 } 579 } 580 581 @Override 582 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 583 indexLock.writeLock().lock(); 584 try { 585 pageFile.tx().execute(new Transaction.Closure<Exception>() { 586 @Override 587 public void execute(Transaction tx) throws Exception { 588 StoredDestination sd = getStoredDestination(dest, tx); 589 Entry<Long, MessageKeys> entry = null; 590 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 591 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 592 entry = iterator.next(); 593 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 594 continue; 595 } 596 Message msg = loadMessage(entry.getValue().location); 597 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 598 listener.recoverMessage(msg); 599 counter++; 600 if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { 601 break; 602 } 603 } 604 sd.orderIndex.stoppedIterating(); 605 } 606 }); 607 } finally { 608 indexLock.writeLock().unlock(); 609 } 610 } 611 612 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 613 int counter = 0; 614 String id; 615 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 616 id = iterator.next(); 617 iterator.remove(); 618 Long sequence = sd.messageIdIndex.get(tx, id); 619 if (sequence != null) { 620 if (sd.orderIndex.alreadyDispatched(sequence)) { 621 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 622 counter++; 623 if (counter >= maxReturned) { 624 break; 625 } 626 } else { 627 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 628 } 629 } else { 630 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 631 } 632 } 633 return counter; 634 } 635 636 637 @Override 638 public void resetBatching() { 639 if (pageFile.isLoaded()) { 640 indexLock.writeLock().lock(); 641 try { 642 pageFile.tx().execute(new Transaction.Closure<Exception>() { 643 @Override 644 public void execute(Transaction tx) throws Exception { 645 StoredDestination sd = getExistingStoredDestination(dest, tx); 646 if (sd != null) { 647 sd.orderIndex.resetCursorPosition();} 648 } 649 }); 650 } catch (Exception e) { 651 LOG.error("Failed to reset batching",e); 652 } finally { 653 indexLock.writeLock().unlock(); 654 } 655 } 656 } 657 658 @Override 659 public void setBatch(final MessageId identity) throws IOException { 660 indexLock.writeLock().lock(); 661 try { 662 pageFile.tx().execute(new Transaction.Closure<IOException>() { 663 @Override 664 public void execute(Transaction tx) throws IOException { 665 StoredDestination sd = getStoredDestination(dest, tx); 666 Long location = (Long) identity.getFutureOrSequenceLong(); 667 Long pending = sd.orderIndex.minPendingAdd(); 668 if (pending != null) { 669 location = Math.min(location, pending-1); 670 } 671 sd.orderIndex.setBatch(tx, location); 672 } 673 }); 674 } finally { 675 indexLock.writeLock().unlock(); 676 } 677 } 678 679 @Override 680 public void setMemoryUsage(MemoryUsage memoryUsage) { 681 } 682 @Override 683 public void start() throws Exception { 684 super.start(); 685 } 686 @Override 687 public void stop() throws Exception { 688 super.stop(); 689 } 690 691 protected void lockAsyncJobQueue() { 692 try { 693 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 694 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 695 } 696 } catch (Exception e) { 697 LOG.error("Failed to lock async jobs for " + this.destination, e); 698 } 699 } 700 701 protected void unlockAsyncJobQueue() { 702 this.localDestinationSemaphore.release(this.maxAsyncJobs); 703 } 704 705 protected void acquireLocalAsyncLock() { 706 try { 707 this.localDestinationSemaphore.acquire(); 708 } catch (InterruptedException e) { 709 LOG.error("Failed to aquire async lock for " + this.destination, e); 710 } 711 } 712 713 protected void releaseLocalAsyncLock() { 714 this.localDestinationSemaphore.release(); 715 } 716 717 @Override 718 public String toString(){ 719 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 720 } 721 722 @Override 723 protected void recoverMessageStoreStatistics() throws IOException { 724 try { 725 MessageStoreStatistics recoveredStatistics; 726 lockAsyncJobQueue(); 727 indexLock.writeLock().lock(); 728 try { 729 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 730 @Override 731 public MessageStoreStatistics execute(Transaction tx) throws IOException { 732 MessageStoreStatistics statistics = new MessageStoreStatistics(); 733 734 // Iterate through all index entries to get the size of each message 735 StoredDestination sd = getStoredDestination(dest, tx); 736 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 737 int locationSize = iterator.next().getKey().getSize(); 738 statistics.getMessageCount().increment(); 739 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 740 } 741 return statistics; 742 } 743 }); 744 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 745 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 746 } finally { 747 indexLock.writeLock().unlock(); 748 } 749 } finally { 750 unlockAsyncJobQueue(); 751 } 752 } 753 } 754 755 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 756 private final AtomicInteger subscriptionCount = new AtomicInteger(); 757 protected final MessageStoreSubscriptionStatistics messageStoreSubStats = 758 new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics()); 759 760 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 761 super(destination); 762 this.subscriptionCount.set(getAllSubscriptions().length); 763 if (isConcurrentStoreAndDispatchTopics()) { 764 asyncTopicMaps.add(asyncTaskMap); 765 } 766 } 767 768 @Override 769 protected void recoverMessageStoreStatistics() throws IOException { 770 super.recoverMessageStoreStatistics(); 771 this.recoverMessageStoreSubMetrics(); 772 } 773 774 @Override 775 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 776 throws IOException { 777 if (isConcurrentStoreAndDispatchTopics()) { 778 message.beforeMarshall(wireFormat); 779 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 780 result.aquireLocks(); 781 addTopicTask(this, result); 782 return result.getFuture(); 783 } else { 784 return super.asyncAddTopicMessage(context, message); 785 } 786 } 787 788 @Override 789 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 790 MessageId messageId, MessageAck ack) throws IOException { 791 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 792 if (isConcurrentStoreAndDispatchTopics()) { 793 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 794 StoreTopicTask task = null; 795 synchronized (asyncTaskMap) { 796 task = (StoreTopicTask) asyncTaskMap.get(key); 797 } 798 if (task != null) { 799 if (task.addSubscriptionKey(subscriptionKey)) { 800 removeTopicTask(this, messageId); 801 if (task.cancel()) { 802 synchronized (asyncTaskMap) { 803 asyncTaskMap.remove(key); 804 } 805 } 806 } 807 } else { 808 doAcknowledge(context, subscriptionKey, messageId, ack); 809 } 810 } else { 811 doAcknowledge(context, subscriptionKey, messageId, ack); 812 } 813 } 814 815 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 816 throws IOException { 817 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 818 command.setDestination(dest); 819 command.setSubscriptionKey(subscriptionKey); 820 command.setMessageId(messageId.toProducerKey()); 821 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 822 if (ack != null && ack.isUnmatchedAck()) { 823 command.setAck(UNMATCHED); 824 } else { 825 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 826 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 827 } 828 store(command, false, null, null); 829 } 830 831 @Override 832 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 833 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 834 .getSubscriptionName()); 835 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 836 command.setDestination(dest); 837 command.setSubscriptionKey(subscriptionKey.toString()); 838 command.setRetroactive(retroactive); 839 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 840 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 841 store(command, isEnableJournalDiskSyncs() && true, null, null); 842 this.subscriptionCount.incrementAndGet(); 843 } 844 845 @Override 846 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 847 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 848 command.setDestination(dest); 849 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 850 store(command, isEnableJournalDiskSyncs() && true, null, null); 851 this.subscriptionCount.decrementAndGet(); 852 } 853 854 @Override 855 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 856 857 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 858 indexLock.writeLock().lock(); 859 try { 860 pageFile.tx().execute(new Transaction.Closure<IOException>() { 861 @Override 862 public void execute(Transaction tx) throws IOException { 863 StoredDestination sd = getStoredDestination(dest, tx); 864 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 865 .hasNext();) { 866 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 867 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 868 .getValue().getSubscriptionInfo().newInput())); 869 subscriptions.add(info); 870 871 } 872 } 873 }); 874 } finally { 875 indexLock.writeLock().unlock(); 876 } 877 878 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 879 subscriptions.toArray(rc); 880 return rc; 881 } 882 883 @Override 884 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 885 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 886 indexLock.writeLock().lock(); 887 try { 888 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 889 @Override 890 public SubscriptionInfo execute(Transaction tx) throws IOException { 891 StoredDestination sd = getStoredDestination(dest, tx); 892 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 893 if (command == null) { 894 return null; 895 } 896 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 897 .getSubscriptionInfo().newInput())); 898 } 899 }); 900 } finally { 901 indexLock.writeLock().unlock(); 902 } 903 } 904 905 @Override 906 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 907 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 908 909 if (isEnableSubscriptionStatistics()) { 910 return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); 911 } else { 912 913 indexLock.writeLock().lock(); 914 try { 915 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 916 @Override 917 public Integer execute(Transaction tx) throws IOException { 918 StoredDestination sd = getStoredDestination(dest, tx); 919 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 920 if (cursorPos == null) { 921 // The subscription might not exist. 922 return 0; 923 } 924 925 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 926 } 927 }); 928 } finally { 929 indexLock.writeLock().unlock(); 930 } 931 } 932 } 933 934 935 @Override 936 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 937 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 938 if (isEnableSubscriptionStatistics()) { 939 return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); 940 } else { 941 indexLock.writeLock().lock(); 942 try { 943 return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() { 944 @Override 945 public Long execute(Transaction tx) throws IOException { 946 StoredDestination sd = getStoredDestination(dest, tx); 947 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 948 if (cursorPos == null) { 949 // The subscription might not exist. 950 return 0l; 951 } 952 953 return getStoredMessageSize(tx, sd, subscriptionKey); 954 } 955 }); 956 } finally { 957 indexLock.writeLock().unlock(); 958 } 959 } 960 } 961 962 protected void recoverMessageStoreSubMetrics() throws IOException { 963 if (isEnableSubscriptionStatistics()) { 964 final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); 965 indexLock.writeLock().lock(); 966 try { 967 pageFile.tx().execute(new Transaction.Closure<IOException>() { 968 @Override 969 public void execute(Transaction tx) throws IOException { 970 StoredDestination sd = getStoredDestination(dest, tx); 971 972 List<String> subscriptionKeys = new ArrayList<>(); 973 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions 974 .iterator(tx); iterator.hasNext();) { 975 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 976 977 final String subscriptionKey = entry.getKey(); 978 final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 979 if (cursorPos != null) { 980 //add the subscriptions to a list for recovering pending sizes below 981 subscriptionKeys.add(subscriptionKey); 982 //recover just the count here as that is fast 983 statistics.getMessageCount(subscriptionKey) 984 .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); 985 } 986 } 987 988 //Recover the message sizes for each subscription by iterating only 1 time over the order index 989 //to speed up recovery 990 final Map<String, AtomicLong> subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys); 991 subPendingMessageSizes.forEach((k,v) -> { 992 statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0); 993 }); 994 } 995 }); 996 } finally { 997 indexLock.writeLock().unlock(); 998 } 999 } 1000 } 1001 1002 @Override 1003 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 1004 throws Exception { 1005 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1006 @SuppressWarnings("unused") 1007 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1008 indexLock.writeLock().lock(); 1009 try { 1010 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1011 @Override 1012 public void execute(Transaction tx) throws Exception { 1013 StoredDestination sd = getStoredDestination(dest, tx); 1014 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 1015 SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); 1016 //If we have ackPositions tracked then compare the first one as individual acknowledge mode 1017 //may have bumped lastAck even though there are earlier messages to still consume 1018 if (subAckPositions != null && !subAckPositions.isEmpty() 1019 && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) { 1020 //we have messages to ack before lastAckedSequence 1021 sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); 1022 } else { 1023 subAckPositions = null; 1024 sd.orderIndex.setBatch(tx, cursorPos); 1025 } 1026 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 1027 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 1028 .hasNext();) { 1029 Entry<Long, MessageKeys> entry = iterator.next(); 1030 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1031 continue; 1032 } 1033 //If subAckPositions is set then verify the sequence set contains the message still 1034 //and if it doesn't skip it 1035 if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { 1036 continue; 1037 } 1038 listener.recoverMessage(loadMessage(entry.getValue().location)); 1039 } 1040 sd.orderIndex.resetCursorPosition(); 1041 } 1042 }); 1043 } finally { 1044 indexLock.writeLock().unlock(); 1045 } 1046 } 1047 1048 @Override 1049 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1050 final MessageRecoveryListener listener) throws Exception { 1051 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1052 @SuppressWarnings("unused") 1053 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1054 indexLock.writeLock().lock(); 1055 try { 1056 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1057 @Override 1058 public void execute(Transaction tx) throws Exception { 1059 StoredDestination sd = getStoredDestination(dest, tx); 1060 sd.orderIndex.resetCursorPosition(); 1061 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1062 SequenceSet subAckPositions = null; 1063 if (moc == null) { 1064 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1065 if (pos == null) { 1066 // sub deleted 1067 return; 1068 } 1069 subAckPositions = getSequenceSet(tx, sd, subscriptionKey); 1070 //If we have ackPositions tracked then compare the first one as individual acknowledge mode 1071 //may have bumped lastAck even though there are earlier messages to still consume 1072 if (subAckPositions != null && !subAckPositions.isEmpty() 1073 && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) { 1074 //we have messages to ack before lastAckedSequence 1075 sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); 1076 } else { 1077 subAckPositions = null; 1078 sd.orderIndex.setBatch(tx, pos); 1079 } 1080 moc = sd.orderIndex.cursor; 1081 } else { 1082 sd.orderIndex.cursor.sync(moc); 1083 } 1084 1085 Entry<Long, MessageKeys> entry = null; 1086 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 1087 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1088 .hasNext();) { 1089 entry = iterator.next(); 1090 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1091 continue; 1092 } 1093 //If subAckPositions is set then verify the sequence set contains the message still 1094 //and if it doesn't skip it 1095 if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { 1096 continue; 1097 } 1098 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1099 counter++; 1100 } 1101 if (counter >= maxReturned || listener.hasSpace() == false) { 1102 break; 1103 } 1104 } 1105 sd.orderIndex.stoppedIterating(); 1106 if (entry != null) { 1107 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1108 sd.subscriptionCursors.put(subscriptionKey, copy); 1109 } 1110 } 1111 }); 1112 } finally { 1113 indexLock.writeLock().unlock(); 1114 } 1115 } 1116 1117 @Override 1118 public void resetBatching(String clientId, String subscriptionName) { 1119 try { 1120 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1121 indexLock.writeLock().lock(); 1122 try { 1123 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1124 @Override 1125 public void execute(Transaction tx) throws IOException { 1126 StoredDestination sd = getStoredDestination(dest, tx); 1127 sd.subscriptionCursors.remove(subscriptionKey); 1128 } 1129 }); 1130 }finally { 1131 indexLock.writeLock().unlock(); 1132 } 1133 } catch (IOException e) { 1134 throw new RuntimeException(e); 1135 } 1136 } 1137 1138 @Override 1139 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 1140 return messageStoreSubStats; 1141 } 1142 } 1143 1144 String subscriptionKey(String clientId, String subscriptionName) { 1145 return clientId + ":" + subscriptionName; 1146 } 1147 1148 @Override 1149 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1150 String key = key(convert(destination)); 1151 MessageStore store = storeCache.get(key(convert(destination))); 1152 if (store == null) { 1153 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1154 store = storeCache.putIfAbsent(key, queueStore); 1155 if (store == null) { 1156 store = queueStore; 1157 } 1158 } 1159 1160 return store; 1161 } 1162 1163 @Override 1164 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1165 String key = key(convert(destination)); 1166 MessageStore store = storeCache.get(key(convert(destination))); 1167 if (store == null) { 1168 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1169 store = storeCache.putIfAbsent(key, topicStore); 1170 if (store == null) { 1171 store = topicStore; 1172 } 1173 } 1174 1175 return (TopicMessageStore) store; 1176 } 1177 1178 /** 1179 * Cleanup method to remove any state associated with the given destination. 1180 * This method does not stop the message store (it might not be cached). 1181 * 1182 * @param destination 1183 * Destination to forget 1184 */ 1185 @Override 1186 public void removeQueueMessageStore(ActiveMQQueue destination) { 1187 } 1188 1189 /** 1190 * Cleanup method to remove any state associated with the given destination 1191 * This method does not stop the message store (it might not be cached). 1192 * 1193 * @param destination 1194 * Destination to forget 1195 */ 1196 @Override 1197 public void removeTopicMessageStore(ActiveMQTopic destination) { 1198 } 1199 1200 @Override 1201 public void deleteAllMessages() throws IOException { 1202 deleteAllMessages = true; 1203 } 1204 1205 @Override 1206 public Set<ActiveMQDestination> getDestinations() { 1207 try { 1208 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1209 indexLock.writeLock().lock(); 1210 try { 1211 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1212 @Override 1213 public void execute(Transaction tx) throws IOException { 1214 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1215 .hasNext();) { 1216 Entry<String, StoredDestination> entry = iterator.next(); 1217 //Removing isEmpty topic check - see AMQ-5875 1218 rc.add(convert(entry.getKey())); 1219 } 1220 } 1221 }); 1222 }finally { 1223 indexLock.writeLock().unlock(); 1224 } 1225 return rc; 1226 } catch (IOException e) { 1227 throw new RuntimeException(e); 1228 } 1229 } 1230 1231 @Override 1232 public long getLastMessageBrokerSequenceId() throws IOException { 1233 return 0; 1234 } 1235 1236 @Override 1237 public long getLastProducerSequenceId(ProducerId id) { 1238 indexLock.writeLock().lock(); 1239 try { 1240 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1241 } finally { 1242 indexLock.writeLock().unlock(); 1243 } 1244 } 1245 1246 @Override 1247 public long size() { 1248 try { 1249 return journalSize.get() + getPageFile().getDiskSize(); 1250 } catch (IOException e) { 1251 throw new RuntimeException(e); 1252 } 1253 } 1254 1255 @Override 1256 public void beginTransaction(ConnectionContext context) throws IOException { 1257 throw new IOException("Not yet implemented."); 1258 } 1259 @Override 1260 public void commitTransaction(ConnectionContext context) throws IOException { 1261 throw new IOException("Not yet implemented."); 1262 } 1263 @Override 1264 public void rollbackTransaction(ConnectionContext context) throws IOException { 1265 throw new IOException("Not yet implemented."); 1266 } 1267 1268 @Override 1269 public void checkpoint(boolean sync) throws IOException { 1270 super.checkpointCleanup(sync); 1271 } 1272 1273 // ///////////////////////////////////////////////////////////////// 1274 // Internal helper methods. 1275 // ///////////////////////////////////////////////////////////////// 1276 1277 /** 1278 * @param location 1279 * @return 1280 * @throws IOException 1281 */ 1282 Message loadMessage(Location location) throws IOException { 1283 try { 1284 JournalCommand<?> command = load(location); 1285 KahaAddMessageCommand addMessage = null; 1286 switch (command.type()) { 1287 case KAHA_UPDATE_MESSAGE_COMMAND: 1288 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1289 break; 1290 case KAHA_ADD_MESSAGE_COMMAND: 1291 addMessage = (KahaAddMessageCommand) command; 1292 break; 1293 default: 1294 throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location); 1295 } 1296 if (!addMessage.hasMessage()) { 1297 throw new IOException("Could not load journal record, null message content at location: " + location); 1298 } 1299 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1300 return msg; 1301 } catch (Throwable t) { 1302 IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t); 1303 LOG.error("Failed to load message at: {}", location , ioe); 1304 brokerService.handleIOException(ioe); 1305 throw ioe; 1306 } 1307 } 1308 1309 // ///////////////////////////////////////////////////////////////// 1310 // Internal conversion methods. 1311 // ///////////////////////////////////////////////////////////////// 1312 1313 KahaLocation convert(Location location) { 1314 KahaLocation rc = new KahaLocation(); 1315 rc.setLogId(location.getDataFileId()); 1316 rc.setOffset(location.getOffset()); 1317 return rc; 1318 } 1319 1320 KahaDestination convert(ActiveMQDestination dest) { 1321 KahaDestination rc = new KahaDestination(); 1322 rc.setName(dest.getPhysicalName()); 1323 switch (dest.getDestinationType()) { 1324 case ActiveMQDestination.QUEUE_TYPE: 1325 rc.setType(DestinationType.QUEUE); 1326 return rc; 1327 case ActiveMQDestination.TOPIC_TYPE: 1328 rc.setType(DestinationType.TOPIC); 1329 return rc; 1330 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1331 rc.setType(DestinationType.TEMP_QUEUE); 1332 return rc; 1333 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1334 rc.setType(DestinationType.TEMP_TOPIC); 1335 return rc; 1336 default: 1337 return null; 1338 } 1339 } 1340 1341 ActiveMQDestination convert(String dest) { 1342 int p = dest.indexOf(":"); 1343 if (p < 0) { 1344 throw new IllegalArgumentException("Not in the valid destination format"); 1345 } 1346 int type = Integer.parseInt(dest.substring(0, p)); 1347 String name = dest.substring(p + 1); 1348 return convert(type, name); 1349 } 1350 1351 private ActiveMQDestination convert(KahaDestination commandDestination) { 1352 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1353 } 1354 1355 private ActiveMQDestination convert(int type, String name) { 1356 switch (KahaDestination.DestinationType.valueOf(type)) { 1357 case QUEUE: 1358 return new ActiveMQQueue(name); 1359 case TOPIC: 1360 return new ActiveMQTopic(name); 1361 case TEMP_QUEUE: 1362 return new ActiveMQTempQueue(name); 1363 case TEMP_TOPIC: 1364 return new ActiveMQTempTopic(name); 1365 default: 1366 throw new IllegalArgumentException("Not in the valid destination format"); 1367 } 1368 } 1369 1370 public TransactionIdTransformer getTransactionIdTransformer() { 1371 return transactionIdTransformer; 1372 } 1373 1374 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1375 this.transactionIdTransformer = transactionIdTransformer; 1376 } 1377 1378 static class AsyncJobKey { 1379 MessageId id; 1380 ActiveMQDestination destination; 1381 1382 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1383 this.id = id; 1384 this.destination = destination; 1385 } 1386 1387 @Override 1388 public boolean equals(Object obj) { 1389 if (obj == this) { 1390 return true; 1391 } 1392 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1393 && destination.equals(((AsyncJobKey) obj).destination); 1394 } 1395 1396 @Override 1397 public int hashCode() { 1398 return id.hashCode() + destination.hashCode(); 1399 } 1400 1401 @Override 1402 public String toString() { 1403 return destination.getPhysicalName() + "-" + id; 1404 } 1405 } 1406 1407 public interface StoreTask { 1408 public boolean cancel(); 1409 1410 public void aquireLocks(); 1411 1412 public void releaseLocks(); 1413 } 1414 1415 class StoreQueueTask implements Runnable, StoreTask { 1416 protected final Message message; 1417 protected final ConnectionContext context; 1418 protected final KahaDBMessageStore store; 1419 protected final InnerFutureTask future; 1420 protected final AtomicBoolean done = new AtomicBoolean(); 1421 protected final AtomicBoolean locked = new AtomicBoolean(); 1422 1423 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1424 this.store = store; 1425 this.context = context; 1426 this.message = message; 1427 this.future = new InnerFutureTask(this); 1428 } 1429 1430 public ListenableFuture<Object> getFuture() { 1431 return this.future; 1432 } 1433 1434 @Override 1435 public boolean cancel() { 1436 if (this.done.compareAndSet(false, true)) { 1437 return this.future.cancel(false); 1438 } 1439 return false; 1440 } 1441 1442 @Override 1443 public void aquireLocks() { 1444 if (this.locked.compareAndSet(false, true)) { 1445 try { 1446 globalQueueSemaphore.acquire(); 1447 store.acquireLocalAsyncLock(); 1448 message.incrementReferenceCount(); 1449 } catch (InterruptedException e) { 1450 LOG.warn("Failed to aquire lock", e); 1451 } 1452 } 1453 1454 } 1455 1456 @Override 1457 public void releaseLocks() { 1458 if (this.locked.compareAndSet(true, false)) { 1459 store.releaseLocalAsyncLock(); 1460 globalQueueSemaphore.release(); 1461 message.decrementReferenceCount(); 1462 } 1463 } 1464 1465 @Override 1466 public void run() { 1467 this.store.doneTasks++; 1468 try { 1469 if (this.done.compareAndSet(false, true)) { 1470 this.store.addMessage(context, message); 1471 removeQueueTask(this.store, this.message.getMessageId()); 1472 this.future.complete(); 1473 } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) { 1474 System.err.println(this.store.dest.getName() + " cancelled: " 1475 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1476 this.store.canceledTasks = this.store.doneTasks = 0; 1477 } 1478 } catch (Throwable t) { 1479 this.future.setException(t); 1480 removeQueueTask(this.store, this.message.getMessageId()); 1481 } 1482 } 1483 1484 protected Message getMessage() { 1485 return this.message; 1486 } 1487 1488 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1489 1490 private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); 1491 1492 public InnerFutureTask(Runnable runnable) { 1493 super(runnable, null); 1494 } 1495 1496 @Override 1497 public void setException(final Throwable e) { 1498 super.setException(e); 1499 } 1500 1501 public void complete() { 1502 super.set(null); 1503 } 1504 1505 @Override 1506 public void done() { 1507 fireListener(); 1508 } 1509 1510 @Override 1511 public void addListener(Runnable listener) { 1512 this.listenerRef.set(listener); 1513 if (isDone()) { 1514 fireListener(); 1515 } 1516 } 1517 1518 private void fireListener() { 1519 Runnable listener = listenerRef.getAndSet(null); 1520 if (listener != null) { 1521 try { 1522 listener.run(); 1523 } catch (Exception ignored) { 1524 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1525 } 1526 } 1527 } 1528 } 1529 } 1530 1531 class StoreTopicTask extends StoreQueueTask { 1532 private final int subscriptionCount; 1533 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1534 private final KahaDBTopicMessageStore topicStore; 1535 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1536 int subscriptionCount) { 1537 super(store, context, message); 1538 this.topicStore = store; 1539 this.subscriptionCount = subscriptionCount; 1540 1541 } 1542 1543 @Override 1544 public void aquireLocks() { 1545 if (this.locked.compareAndSet(false, true)) { 1546 try { 1547 globalTopicSemaphore.acquire(); 1548 store.acquireLocalAsyncLock(); 1549 message.incrementReferenceCount(); 1550 } catch (InterruptedException e) { 1551 LOG.warn("Failed to aquire lock", e); 1552 } 1553 } 1554 } 1555 1556 @Override 1557 public void releaseLocks() { 1558 if (this.locked.compareAndSet(true, false)) { 1559 message.decrementReferenceCount(); 1560 store.releaseLocalAsyncLock(); 1561 globalTopicSemaphore.release(); 1562 } 1563 } 1564 1565 /** 1566 * add a key 1567 * 1568 * @param key 1569 * @return true if all acknowledgements received 1570 */ 1571 public boolean addSubscriptionKey(String key) { 1572 synchronized (this.subscriptionKeys) { 1573 this.subscriptionKeys.add(key); 1574 } 1575 return this.subscriptionKeys.size() >= this.subscriptionCount; 1576 } 1577 1578 @Override 1579 public void run() { 1580 this.store.doneTasks++; 1581 try { 1582 if (this.done.compareAndSet(false, true)) { 1583 this.topicStore.addMessage(context, message); 1584 // apply any acks we have 1585 synchronized (this.subscriptionKeys) { 1586 for (String key : this.subscriptionKeys) { 1587 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1588 1589 } 1590 } 1591 removeTopicTask(this.topicStore, this.message.getMessageId()); 1592 this.future.complete(); 1593 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1594 System.err.println(this.store.dest.getName() + " cancelled: " 1595 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1596 this.store.canceledTasks = this.store.doneTasks = 0; 1597 } 1598 } catch (Throwable t) { 1599 this.future.setException(t); 1600 removeTopicTask(this.topicStore, this.message.getMessageId()); 1601 } 1602 } 1603 } 1604 1605 public class StoreTaskExecutor extends ThreadPoolExecutor { 1606 1607 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1608 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1609 } 1610 1611 @Override 1612 protected void afterExecute(Runnable runnable, Throwable throwable) { 1613 super.afterExecute(runnable, throwable); 1614 1615 if (runnable instanceof StoreTask) { 1616 ((StoreTask)runnable).releaseLocks(); 1617 } 1618 } 1619 } 1620 1621 @Override 1622 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1623 return new JobSchedulerStoreImpl(); 1624 } 1625 1626 /* (non-Javadoc) 1627 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 1628 */ 1629 @Override 1630 public boolean isPersistNoLocal() { 1631 // Prior to v11 the broker did not store the noLocal value for durable subs. 1632 return brokerService.getStoreOpenWireVersion() >= 11; 1633 } 1634}