001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.ActiveMQMessageAudit;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.SubscriptionInfo;
038import org.apache.activemq.store.MessageRecoveryListener;
039import org.apache.activemq.store.TopicMessageStore;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.IOExceptionSupport;
042import org.apache.activemq.wireformat.WireFormat;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 *
048 */
049public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
050
051    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
052    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
053    private Set<String> pendingCompletion = new HashSet<String>();
054
055    public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
056    private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
057               PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
058    private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
059    private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
060         @Override
061        protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
062           return size() > SEQUENCE_ID_CACHE_SIZE;
063        }
064    };
065
066
067    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
068        super(persistenceAdapter, adapter, wireFormat, topic, audit);
069    }
070
071    @Override
072    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
073        if (ack != null && ack.isUnmatchedAck()) {
074            if (LOG.isTraceEnabled()) {
075                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
076            }
077            return;
078        }
079        TransactionContext c = persistenceAdapter.getTransactionContext(context);
080        try {
081            long[] res = getCachedStoreSequenceId(c, destination, messageId);
082            if (this.isPrioritizedMessages()) {
083                adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
084            } else {
085                adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
086            }
087            if (LOG.isTraceEnabled()) {
088                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
089            }
090        } catch (SQLException e) {
091            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
092            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
093        } finally {
094            c.close();
095        }
096    }
097
098    public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
099        long[] val = null;
100        sequenceIdCacheSizeLock.readLock().lock();
101        try {
102            val = sequenceIdCache.get(messageId);
103        } finally {
104            sequenceIdCacheSizeLock.readLock().unlock();
105        }
106        if (val == null) {
107            val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
108        }
109        return val;
110    }
111
112    /**
113     * @throws Exception
114     */
115    @Override
116    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
117        TransactionContext c = persistenceAdapter.getTransactionContext();
118        try {
119            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
120                @Override
121                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
122                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
123                    msg.getMessageId().setBrokerSequenceId(sequenceId);
124                    return listener.recoverMessage(msg);
125                }
126
127                @Override
128                public boolean recoverMessageReference(String reference) throws Exception {
129                    return listener.recoverMessageReference(new MessageId(reference));
130                }
131
132            });
133        } catch (SQLException e) {
134            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
135            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
136        } finally {
137            c.close();
138        }
139    }
140
141    private class LastRecovered implements Iterable<LastRecoveredEntry> {
142        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
143        LastRecovered() {
144            for (int i=0; i<perPriority.length; i++) {
145                perPriority[i] = new LastRecoveredEntry(i);
146            }
147        }
148
149        public void updateStored(long sequence, int priority) {
150            perPriority[priority].stored = sequence;
151        }
152
153        public LastRecoveredEntry defaultPriority() {
154            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
155        }
156
157        @Override
158        public String toString() {
159            return Arrays.deepToString(perPriority);
160        }
161
162        @Override
163        public Iterator<LastRecoveredEntry> iterator() {
164            return new PriorityIterator();
165        }
166
167        class PriorityIterator implements Iterator<LastRecoveredEntry> {
168            int current = 9;
169            @Override
170            public boolean hasNext() {
171                for (int i=current; i>=0; i--) {
172                    if (perPriority[i].hasMessages()) {
173                        current = i;
174                        return true;
175                    }
176                }
177                return false;
178            }
179
180            @Override
181            public LastRecoveredEntry next() {
182                return perPriority[current];
183            }
184
185            @Override
186            public void remove() {
187                throw new RuntimeException("not implemented");
188            }
189        }
190    }
191
192    private class LastRecoveredEntry {
193        final int priority;
194        long recovered = 0;
195        long stored = Integer.MAX_VALUE;
196
197        public LastRecoveredEntry(int priority) {
198            this.priority = priority;
199        }
200
201        @Override
202        public String toString() {
203            return priority + "-" + stored + ":" + recovered;
204        }
205
206        public void exhausted() {
207            stored = recovered;
208        }
209
210        public boolean hasMessages() {
211            return stored > recovered;
212        }
213    }
214
215    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
216        final MessageRecoveryListener delegate;
217        final int maxMessages;
218        LastRecoveredEntry lastRecovered;
219        int recoveredCount;
220        int recoveredMarker;
221
222        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
223            this.delegate = delegate;
224            this.maxMessages = maxMessages;
225        }
226
227        @Override
228        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
229            if (delegate.hasSpace() && recoveredCount < maxMessages) {
230                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
231                msg.getMessageId().setBrokerSequenceId(sequenceId);
232                lastRecovered.recovered = sequenceId;
233                if (delegate.recoverMessage(msg)) {
234                    recoveredCount++;
235                    return true;
236                }
237            }
238            return false;
239        }
240
241        @Override
242        public boolean recoverMessageReference(String reference) throws Exception {
243            return delegate.recoverMessageReference(new MessageId(reference));
244        }
245
246        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
247            this.lastRecovered = lastRecovered;
248            recoveredMarker = recoveredCount;
249        }
250
251        public boolean complete() {
252            return  !delegate.hasSpace() || recoveredCount == maxMessages;
253        }
254
255        public boolean stalled() {
256            return recoveredMarker == recoveredCount;
257        }
258    }
259
260    @Override
261    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
262            throws Exception {
263        //Duration duration = new Duration("recoverNextMessages");
264        TransactionContext c = persistenceAdapter.getTransactionContext();
265
266        String key = getSubscriptionKey(clientId, subscriptionName);
267        if (!subscriberLastRecoveredMap.containsKey(key)) {
268           subscriberLastRecoveredMap.put(key, new LastRecovered());
269        }
270        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
271        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
272        try {
273            if (LOG.isTraceEnabled()) {
274                LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
275            }
276            if (isPrioritizedMessages()) {
277                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
278                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
279                    LastRecoveredEntry entry = it.next();
280                    recoveredAwareListener.setLastRecovered(entry);
281                    //Duration microDuration = new Duration("recoverNextMessages:loop");
282                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
283                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
284                    //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
285                    if (recoveredAwareListener.stalled()) {
286                        if (recoveredAwareListener.complete()) {
287                            break;
288                        } else {
289                            entry.exhausted();
290                        }
291                    }
292                }
293            } else {
294                LastRecoveredEntry last = lastRecovered.defaultPriority();
295                recoveredAwareListener.setLastRecovered(last);
296                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
297                        last.recovered, 0, maxReturned, recoveredAwareListener);
298            }
299            if (LOG.isTraceEnabled()) {
300                LOG.trace(key + " last recovered: " + lastRecovered);
301            }
302            //duration.end();
303        } catch (SQLException e) {
304            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
305        } finally {
306            c.close();
307        }
308    }
309
310    @Override
311    public void resetBatching(String clientId, String subscriptionName) {
312        String key = getSubscriptionKey(clientId, subscriptionName);
313        if (!pendingCompletion.contains(key))  {
314            subscriberLastRecoveredMap.remove(key);
315        } else {
316            LOG.trace(this +  ", skip resetBatch during pending completion for: " + key);
317        }
318    }
319
320    public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
321        final String key = getSubscriptionKey(clientId, subscriptionName);
322        LastRecovered recovered = new LastRecovered();
323        recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
324        subscriberLastRecoveredMap.put(key, recovered);
325        pendingCompletion.add(key);
326        LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
327    }
328
329    public void complete(String clientId, String subscriptionName) {
330        pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
331        LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
332    }
333
334    @Override
335    protected void onAdd(Message message, long sequenceId, byte priority) {
336        // update last recovered state
337        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
338            last.updateStored(sequenceId, priority);
339        }
340        sequenceIdCacheSizeLock.writeLock().lock();
341        try {
342            sequenceIdCache.put(message.getMessageId(), new long[]{sequenceId, priority});
343        } finally {
344            sequenceIdCacheSizeLock.writeLock().unlock();
345        }
346    }
347
348    @Override
349    public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
350        TransactionContext c = persistenceAdapter.getTransactionContext();
351        try {
352            c = persistenceAdapter.getTransactionContext();
353            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
354        } catch (SQLException e) {
355            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
356            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
357        } finally {
358            c.close();
359        }
360    }
361
362    /**
363     * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
364     *      String)
365     */
366    @Override
367    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
368        TransactionContext c = persistenceAdapter.getTransactionContext();
369        try {
370            return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
371        } catch (SQLException e) {
372            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
373            throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
374        } finally {
375            c.close();
376        }
377    }
378
379    @Override
380    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
381        TransactionContext c = persistenceAdapter.getTransactionContext();
382        try {
383            adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
384        } catch (SQLException e) {
385            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
386            throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
387        } finally {
388            c.close();
389            resetBatching(clientId, subscriptionName);
390        }
391    }
392
393    @Override
394    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
395        TransactionContext c = persistenceAdapter.getTransactionContext();
396        try {
397            return adapter.doGetAllSubscriptions(c, destination);
398        } catch (SQLException e) {
399            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
400            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
401        } finally {
402            c.close();
403        }
404    }
405
406    @Override
407    public int getMessageCount(String clientId, String subscriberName) throws IOException {
408        //Duration duration = new Duration("getMessageCount");
409        int result = 0;
410        TransactionContext c = persistenceAdapter.getTransactionContext();
411        try {
412            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
413        } catch (SQLException e) {
414            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
415            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
416        } finally {
417            c.close();
418        }
419        if (LOG.isTraceEnabled()) {
420            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
421        }
422        //duration.end();
423        return result;
424    }
425
426    @Override
427    public long getMessageSize(String clientId, String subscriberName) throws IOException {
428        return 0;
429    }
430
431    protected String getSubscriptionKey(String clientId, String subscriberName) {
432        String result = clientId + ":";
433        result += subscriberName != null ? subscriberName : "NOT_SET";
434        return result;
435    }
436
437}