001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.ActiveMQMessageAudit;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.SubscriptionInfo;
038import org.apache.activemq.store.MessageRecoveryListener;
039import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
040import org.apache.activemq.store.TopicMessageStore;
041import org.apache.activemq.util.ByteSequence;
042import org.apache.activemq.util.IOExceptionSupport;
043import org.apache.activemq.wireformat.WireFormat;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 *
049 */
050public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
051
052    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
053    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
054    private Set<String> pendingCompletion = new HashSet<String>();
055
056    public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
057    private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
058               PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
059    private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
060    private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
061         @Override
062        protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
063           return size() > SEQUENCE_ID_CACHE_SIZE;
064        }
065    };
066
067
068    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
069        super(persistenceAdapter, adapter, wireFormat, topic, audit);
070    }
071
072    @Override
073    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
074        if (ack != null && ack.isUnmatchedAck()) {
075            if (LOG.isTraceEnabled()) {
076                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
077            }
078            return;
079        }
080        TransactionContext c = persistenceAdapter.getTransactionContext(context);
081        try {
082            long[] res = getCachedStoreSequenceId(c, destination, messageId);
083            if (this.isPrioritizedMessages()) {
084                adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
085            } else {
086                adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
087            }
088            if (LOG.isTraceEnabled()) {
089                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
090            }
091        } catch (SQLException e) {
092            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
093            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
094        } finally {
095            c.close();
096        }
097    }
098
099    public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
100        long[] val = null;
101        sequenceIdCacheSizeLock.readLock().lock();
102        try {
103            val = sequenceIdCache.get(messageId);
104        } finally {
105            sequenceIdCacheSizeLock.readLock().unlock();
106        }
107        if (val == null) {
108            val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
109        }
110        return val;
111    }
112
113    /**
114     * @throws Exception
115     */
116    @Override
117    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
118        TransactionContext c = persistenceAdapter.getTransactionContext();
119        try {
120            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
121                @Override
122                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
123                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
124                    msg.getMessageId().setBrokerSequenceId(sequenceId);
125                    return listener.recoverMessage(msg);
126                }
127
128                @Override
129                public boolean recoverMessageReference(String reference) throws Exception {
130                    return listener.recoverMessageReference(new MessageId(reference));
131                }
132
133            });
134        } catch (SQLException e) {
135            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
136            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
137        } finally {
138            c.close();
139        }
140    }
141
142    private class LastRecovered implements Iterable<LastRecoveredEntry> {
143        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
144        LastRecovered() {
145            for (int i=0; i<perPriority.length; i++) {
146                perPriority[i] = new LastRecoveredEntry(i);
147            }
148        }
149
150        public void updateStored(long sequence, int priority) {
151            perPriority[priority].stored = sequence;
152        }
153
154        public LastRecoveredEntry defaultPriority() {
155            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
156        }
157
158        @Override
159        public String toString() {
160            return Arrays.deepToString(perPriority);
161        }
162
163        @Override
164        public Iterator<LastRecoveredEntry> iterator() {
165            return new PriorityIterator();
166        }
167
168        class PriorityIterator implements Iterator<LastRecoveredEntry> {
169            int current = 9;
170            @Override
171            public boolean hasNext() {
172                for (int i=current; i>=0; i--) {
173                    if (perPriority[i].hasMessages()) {
174                        current = i;
175                        return true;
176                    }
177                }
178                return false;
179            }
180
181            @Override
182            public LastRecoveredEntry next() {
183                return perPriority[current];
184            }
185
186            @Override
187            public void remove() {
188                throw new RuntimeException("not implemented");
189            }
190        }
191    }
192
193    private class LastRecoveredEntry {
194        final int priority;
195        long recovered = 0;
196        long stored = Integer.MAX_VALUE;
197
198        public LastRecoveredEntry(int priority) {
199            this.priority = priority;
200        }
201
202        @Override
203        public String toString() {
204            return priority + "-" + stored + ":" + recovered;
205        }
206
207        public void exhausted() {
208            stored = recovered;
209        }
210
211        public boolean hasMessages() {
212            return stored > recovered;
213        }
214    }
215
216    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
217        final MessageRecoveryListener delegate;
218        final int maxMessages;
219        LastRecoveredEntry lastRecovered;
220        int recoveredCount;
221        int recoveredMarker;
222
223        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
224            this.delegate = delegate;
225            this.maxMessages = maxMessages;
226        }
227
228        @Override
229        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
230            if (delegate.hasSpace() && recoveredCount < maxMessages) {
231                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
232                msg.getMessageId().setBrokerSequenceId(sequenceId);
233                lastRecovered.recovered = sequenceId;
234                if (delegate.recoverMessage(msg)) {
235                    recoveredCount++;
236                    return true;
237                }
238            }
239            return false;
240        }
241
242        @Override
243        public boolean recoverMessageReference(String reference) throws Exception {
244            return delegate.recoverMessageReference(new MessageId(reference));
245        }
246
247        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
248            this.lastRecovered = lastRecovered;
249            recoveredMarker = recoveredCount;
250        }
251
252        public boolean complete() {
253            return  !delegate.hasSpace() || recoveredCount == maxMessages;
254        }
255
256        public boolean stalled() {
257            return recoveredMarker == recoveredCount;
258        }
259    }
260
261    @Override
262    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
263            throws Exception {
264        //Duration duration = new Duration("recoverNextMessages");
265        TransactionContext c = persistenceAdapter.getTransactionContext();
266
267        String key = getSubscriptionKey(clientId, subscriptionName);
268        if (!subscriberLastRecoveredMap.containsKey(key)) {
269           subscriberLastRecoveredMap.put(key, new LastRecovered());
270        }
271        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
272        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
273        try {
274            if (LOG.isTraceEnabled()) {
275                LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
276            }
277            if (isPrioritizedMessages()) {
278                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
279                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
280                    LastRecoveredEntry entry = it.next();
281                    recoveredAwareListener.setLastRecovered(entry);
282                    //Duration microDuration = new Duration("recoverNextMessages:loop");
283                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
284                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
285                    //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
286                    if (recoveredAwareListener.stalled()) {
287                        if (recoveredAwareListener.complete()) {
288                            break;
289                        } else {
290                            entry.exhausted();
291                        }
292                    }
293                }
294            } else {
295                LastRecoveredEntry last = lastRecovered.defaultPriority();
296                recoveredAwareListener.setLastRecovered(last);
297                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
298                        last.recovered, 0, maxReturned, recoveredAwareListener);
299            }
300            if (LOG.isTraceEnabled()) {
301                LOG.trace(key + " last recovered: " + lastRecovered);
302            }
303            //duration.end();
304        } catch (SQLException e) {
305            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
306        } finally {
307            c.close();
308        }
309    }
310
311    @Override
312    public void resetBatching(String clientId, String subscriptionName) {
313        String key = getSubscriptionKey(clientId, subscriptionName);
314        if (!pendingCompletion.contains(key))  {
315            subscriberLastRecoveredMap.remove(key);
316        } else {
317            LOG.trace(this +  ", skip resetBatch during pending completion for: " + key);
318        }
319    }
320
321    public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
322        final String key = getSubscriptionKey(clientId, subscriptionName);
323        LastRecovered recovered = new LastRecovered();
324        recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
325        subscriberLastRecoveredMap.put(key, recovered);
326        pendingCompletion.add(key);
327        LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
328    }
329
330    public void complete(String clientId, String subscriptionName) {
331        pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
332        LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
333    }
334
335    @Override
336    protected void onAdd(Message message, long sequenceId, byte priority) {
337        // update last recovered state
338        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
339            last.updateStored(sequenceId, priority);
340        }
341        sequenceIdCacheSizeLock.writeLock().lock();
342        try {
343            sequenceIdCache.put(message.getMessageId(), new long[]{sequenceId, priority});
344        } finally {
345            sequenceIdCacheSizeLock.writeLock().unlock();
346        }
347    }
348
349    @Override
350    public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
351        TransactionContext c = persistenceAdapter.getTransactionContext();
352        try {
353            c = persistenceAdapter.getTransactionContext();
354            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
355        } catch (SQLException e) {
356            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
357            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
358        } finally {
359            c.close();
360        }
361    }
362
363    /**
364     * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
365     *      String)
366     */
367    @Override
368    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
369        TransactionContext c = persistenceAdapter.getTransactionContext();
370        try {
371            return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
372        } catch (SQLException e) {
373            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
374            throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
375        } finally {
376            c.close();
377        }
378    }
379
380    @Override
381    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
382        TransactionContext c = persistenceAdapter.getTransactionContext();
383        try {
384            adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
385        } catch (SQLException e) {
386            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
387            throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
388        } finally {
389            c.close();
390            resetBatching(clientId, subscriptionName);
391        }
392    }
393
394    @Override
395    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
396        TransactionContext c = persistenceAdapter.getTransactionContext();
397        try {
398            return adapter.doGetAllSubscriptions(c, destination);
399        } catch (SQLException e) {
400            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
401            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
402        } finally {
403            c.close();
404        }
405    }
406
407    @Override
408    public int getMessageCount(String clientId, String subscriberName) throws IOException {
409        //Duration duration = new Duration("getMessageCount");
410        int result = 0;
411        TransactionContext c = persistenceAdapter.getTransactionContext();
412        try {
413            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
414        } catch (SQLException e) {
415            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
416            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
417        } finally {
418            c.close();
419        }
420        if (LOG.isTraceEnabled()) {
421            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
422        }
423        //duration.end();
424        return result;
425    }
426
427    @Override
428    public long getMessageSize(String clientId, String subscriberName) throws IOException {
429        return 0;
430    }
431
432    protected String getSubscriptionKey(String clientId, String subscriberName) {
433        String result = clientId + ":";
434        result += subscriberName != null ? subscriberName : "NOT_SET";
435        return result;
436    }
437
438    private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
439
440    @Override
441    public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
442        return stats;
443    }
444
445}