001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.Arrays;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.LinkedHashMap;
025 import java.util.Map;
026 import java.util.Set;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030 import org.apache.activemq.ActiveMQMessageAudit;
031 import org.apache.activemq.broker.ConnectionContext;
032 import org.apache.activemq.command.ActiveMQDestination;
033 import org.apache.activemq.command.ActiveMQTopic;
034 import org.apache.activemq.command.Message;
035 import org.apache.activemq.command.MessageAck;
036 import org.apache.activemq.command.MessageId;
037 import org.apache.activemq.command.SubscriptionInfo;
038 import org.apache.activemq.store.MessageRecoveryListener;
039 import org.apache.activemq.store.TopicMessageStore;
040 import org.apache.activemq.util.ByteSequence;
041 import org.apache.activemq.util.IOExceptionSupport;
042 import org.apache.activemq.wireformat.WireFormat;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 /**
047 *
048 */
049 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
050
051 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
052 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
053 private Set<String> pendingCompletion = new HashSet<String>();
054
055 public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
056 private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
057 PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
058 private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
059 private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
060 protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
061 return size() > SEQUENCE_ID_CACHE_SIZE;
062 }
063 };
064
065
066 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
067 super(persistenceAdapter, adapter, wireFormat, topic, audit);
068 }
069
070 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
071 if (ack != null && ack.isUnmatchedAck()) {
072 if (LOG.isTraceEnabled()) {
073 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
074 }
075 return;
076 }
077 TransactionContext c = persistenceAdapter.getTransactionContext(context);
078 try {
079 long[] res = getCachedStoreSequenceId(c, destination, messageId);
080 if (this.isPrioritizedMessages()) {
081 adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
082 } else {
083 adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
084 }
085 if (LOG.isTraceEnabled()) {
086 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
087 }
088 } catch (SQLException e) {
089 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
090 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
091 } finally {
092 c.close();
093 }
094 }
095
096 public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
097 long[] val = null;
098 sequenceIdCacheSizeLock.readLock().lock();
099 try {
100 val = sequenceIdCache.get(messageId);
101 } finally {
102 sequenceIdCacheSizeLock.readLock().unlock();
103 }
104 if (val == null) {
105 val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
106 }
107 return val;
108 }
109
110 /**
111 * @throws Exception
112 */
113 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
114 TransactionContext c = persistenceAdapter.getTransactionContext();
115 try {
116 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
117 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
118 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
119 msg.getMessageId().setBrokerSequenceId(sequenceId);
120 return listener.recoverMessage(msg);
121 }
122
123 public boolean recoverMessageReference(String reference) throws Exception {
124 return listener.recoverMessageReference(new MessageId(reference));
125 }
126
127 });
128 } catch (SQLException e) {
129 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
130 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
131 } finally {
132 c.close();
133 }
134 }
135
136 private class LastRecovered implements Iterable<LastRecoveredEntry> {
137 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
138 LastRecovered() {
139 for (int i=0; i<perPriority.length; i++) {
140 perPriority[i] = new LastRecoveredEntry(i);
141 }
142 }
143
144 public void updateStored(long sequence, int priority) {
145 perPriority[priority].stored = sequence;
146 }
147
148 public LastRecoveredEntry defaultPriority() {
149 return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
150 }
151
152 public String toString() {
153 return Arrays.deepToString(perPriority);
154 }
155
156 public Iterator<LastRecoveredEntry> iterator() {
157 return new PriorityIterator();
158 }
159
160 class PriorityIterator implements Iterator<LastRecoveredEntry> {
161 int current = 9;
162 public boolean hasNext() {
163 for (int i=current; i>=0; i--) {
164 if (perPriority[i].hasMessages()) {
165 current = i;
166 return true;
167 }
168 }
169 return false;
170 }
171
172 public LastRecoveredEntry next() {
173 return perPriority[current];
174 }
175
176 public void remove() {
177 throw new RuntimeException("not implemented");
178 }
179 }
180 }
181
182 private class LastRecoveredEntry {
183 final int priority;
184 long recovered = 0;
185 long stored = Integer.MAX_VALUE;
186
187 public LastRecoveredEntry(int priority) {
188 this.priority = priority;
189 }
190
191 public String toString() {
192 return priority + "-" + stored + ":" + recovered;
193 }
194
195 public void exhausted() {
196 stored = recovered;
197 }
198
199 public boolean hasMessages() {
200 return stored > recovered;
201 }
202 }
203
204 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
205 final MessageRecoveryListener delegate;
206 final int maxMessages;
207 LastRecoveredEntry lastRecovered;
208 int recoveredCount;
209 int recoveredMarker;
210
211 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
212 this.delegate = delegate;
213 this.maxMessages = maxMessages;
214 }
215
216 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
217 if (delegate.hasSpace() && recoveredCount < maxMessages) {
218 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
219 msg.getMessageId().setBrokerSequenceId(sequenceId);
220 lastRecovered.recovered = sequenceId;
221 if (delegate.recoverMessage(msg)) {
222 recoveredCount++;
223 return true;
224 }
225 }
226 return false;
227 }
228
229 public boolean recoverMessageReference(String reference) throws Exception {
230 return delegate.recoverMessageReference(new MessageId(reference));
231 }
232
233 public void setLastRecovered(LastRecoveredEntry lastRecovered) {
234 this.lastRecovered = lastRecovered;
235 recoveredMarker = recoveredCount;
236 }
237
238 public boolean complete() {
239 return !delegate.hasSpace() || recoveredCount == maxMessages;
240 }
241
242 public boolean stalled() {
243 return recoveredMarker == recoveredCount;
244 }
245 }
246
247 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
248 throws Exception {
249 //Duration duration = new Duration("recoverNextMessages");
250 TransactionContext c = persistenceAdapter.getTransactionContext();
251
252 String key = getSubscriptionKey(clientId, subscriptionName);
253 if (!subscriberLastRecoveredMap.containsKey(key)) {
254 subscriberLastRecoveredMap.put(key, new LastRecovered());
255 }
256 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
257 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
258 try {
259 if (LOG.isTraceEnabled()) {
260 LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
261 }
262 if (isPrioritizedMessages()) {
263 Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
264 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
265 LastRecoveredEntry entry = it.next();
266 recoveredAwareListener.setLastRecovered(entry);
267 //Duration microDuration = new Duration("recoverNextMessages:loop");
268 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
269 entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
270 //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
271 if (recoveredAwareListener.stalled()) {
272 if (recoveredAwareListener.complete()) {
273 break;
274 } else {
275 entry.exhausted();
276 }
277 }
278 }
279 } else {
280 LastRecoveredEntry last = lastRecovered.defaultPriority();
281 recoveredAwareListener.setLastRecovered(last);
282 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
283 last.recovered, 0, maxReturned, recoveredAwareListener);
284 }
285 if (LOG.isTraceEnabled()) {
286 LOG.trace(key + " last recovered: " + lastRecovered);
287 }
288 //duration.end();
289 } catch (SQLException e) {
290 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
291 } finally {
292 c.close();
293 }
294 }
295
296 public void resetBatching(String clientId, String subscriptionName) {
297 String key = getSubscriptionKey(clientId, subscriptionName);
298 if (!pendingCompletion.contains(key)) {
299 subscriberLastRecoveredMap.remove(key);
300 } else {
301 LOG.trace(this + ", skip resetBatch during pending completion for: " + key);
302 }
303 }
304
305 public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
306 final String key = getSubscriptionKey(clientId, subscriptionName);
307 LastRecovered recovered = new LastRecovered();
308 recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
309 subscriberLastRecoveredMap.put(key, recovered);
310 pendingCompletion.add(key);
311 LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
312 }
313
314 public void complete(String clientId, String subscriptionName) {
315 pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
316 LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
317 }
318
319 protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
320 // update last recovered state
321 for (LastRecovered last : subscriberLastRecoveredMap.values()) {
322 last.updateStored(sequenceId, priority);
323 }
324 sequenceIdCacheSizeLock.writeLock().lock();
325 try {
326 sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
327 } finally {
328 sequenceIdCacheSizeLock.writeLock().unlock();
329 }
330 }
331
332
333 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
334 TransactionContext c = persistenceAdapter.getTransactionContext();
335 try {
336 c = persistenceAdapter.getTransactionContext();
337 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
338 } catch (SQLException e) {
339 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
340 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
341 } finally {
342 c.close();
343 }
344 }
345
346 /**
347 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
348 * String)
349 */
350 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
351 TransactionContext c = persistenceAdapter.getTransactionContext();
352 try {
353 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
354 } catch (SQLException e) {
355 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
356 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
357 } finally {
358 c.close();
359 }
360 }
361
362 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
363 TransactionContext c = persistenceAdapter.getTransactionContext();
364 try {
365 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
366 } catch (SQLException e) {
367 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
368 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
369 } finally {
370 c.close();
371 resetBatching(clientId, subscriptionName);
372 }
373 }
374
375 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
376 TransactionContext c = persistenceAdapter.getTransactionContext();
377 try {
378 return adapter.doGetAllSubscriptions(c, destination);
379 } catch (SQLException e) {
380 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
381 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
382 } finally {
383 c.close();
384 }
385 }
386
387 public int getMessageCount(String clientId, String subscriberName) throws IOException {
388 //Duration duration = new Duration("getMessageCount");
389 int result = 0;
390 TransactionContext c = persistenceAdapter.getTransactionContext();
391 try {
392 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
393 } catch (SQLException e) {
394 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
395 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
396 } finally {
397 c.close();
398 }
399 if (LOG.isTraceEnabled()) {
400 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
401 }
402 //duration.end();
403 return result;
404 }
405
406 protected String getSubscriptionKey(String clientId, String subscriberName) {
407 String result = clientId + ":";
408 result += subscriberName != null ? subscriberName : "NOT_SET";
409 return result;
410 }
411
412 }