001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.Arrays;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.LinkedHashMap;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.ActiveMQMessageAudit;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.command.ActiveMQDestination;
033    import org.apache.activemq.command.ActiveMQTopic;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageId;
037    import org.apache.activemq.command.SubscriptionInfo;
038    import org.apache.activemq.store.MessageRecoveryListener;
039    import org.apache.activemq.store.TopicMessageStore;
040    import org.apache.activemq.util.ByteSequence;
041    import org.apache.activemq.util.IOExceptionSupport;
042    import org.apache.activemq.wireformat.WireFormat;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * 
048     */
049    public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
050    
051        private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
052        private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
053        private Set<String> pendingCompletion = new HashSet<String>();
054    
055        public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
056        private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
057                   PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
058        private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
059        private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
060             protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
061               return size() > SEQUENCE_ID_CACHE_SIZE;
062            }
063        };
064    
065    
066        public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
067            super(persistenceAdapter, adapter, wireFormat, topic, audit);
068        }
069    
070        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
071            if (ack != null && ack.isUnmatchedAck()) {
072                if (LOG.isTraceEnabled()) {
073                    LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
074                }
075                return;
076            }
077            TransactionContext c = persistenceAdapter.getTransactionContext(context);
078            try {
079                long[] res = getCachedStoreSequenceId(c, destination, messageId);
080                if (this.isPrioritizedMessages()) {
081                    adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
082                } else {
083                    adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
084                }
085                if (LOG.isTraceEnabled()) {
086                    LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
087                }
088            } catch (SQLException e) {
089                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
090                throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
091            } finally {
092                c.close();
093            }
094        }
095    
096        public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
097            long[] val = null;
098            sequenceIdCacheSizeLock.readLock().lock();
099            try {
100                val = sequenceIdCache.get(messageId);
101            } finally {
102                sequenceIdCacheSizeLock.readLock().unlock();
103            }
104            if (val == null) {
105                val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
106            }
107            return val;
108        }
109    
110        /**
111         * @throws Exception
112         */
113        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
114            TransactionContext c = persistenceAdapter.getTransactionContext();
115            try {
116                adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
117                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
118                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
119                        msg.getMessageId().setBrokerSequenceId(sequenceId);
120                        return listener.recoverMessage(msg);
121                    }
122    
123                    public boolean recoverMessageReference(String reference) throws Exception {
124                        return listener.recoverMessageReference(new MessageId(reference));
125                    }
126    
127                });
128            } catch (SQLException e) {
129                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
130                throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
131            } finally {
132                c.close();
133            }
134        }
135    
136        private class LastRecovered implements Iterable<LastRecoveredEntry> {
137            LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
138            LastRecovered() {
139                for (int i=0; i<perPriority.length; i++) {
140                    perPriority[i] = new LastRecoveredEntry(i);
141                }
142            }
143    
144            public void updateStored(long sequence, int priority) {
145                perPriority[priority].stored = sequence;
146            }
147    
148            public LastRecoveredEntry defaultPriority() {
149                return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
150            }
151    
152            public String toString() {
153                return Arrays.deepToString(perPriority);
154            }
155    
156            public Iterator<LastRecoveredEntry> iterator() {
157                return new PriorityIterator();
158            }
159    
160            class PriorityIterator implements Iterator<LastRecoveredEntry> {
161                int current = 9;
162                public boolean hasNext() {
163                    for (int i=current; i>=0; i--) {
164                        if (perPriority[i].hasMessages()) {
165                            current = i;
166                            return true;
167                        }
168                    }
169                    return false;
170                }
171    
172                public LastRecoveredEntry next() {
173                    return perPriority[current];
174                }
175    
176                public void remove() {
177                    throw new RuntimeException("not implemented");
178                }
179            }
180        }
181    
182        private class LastRecoveredEntry {
183            final int priority;
184            long recovered = 0;
185            long stored = Integer.MAX_VALUE;
186    
187            public LastRecoveredEntry(int priority) {
188                this.priority = priority;
189            }
190    
191            public String toString() {
192                return priority + "-" + stored + ":" + recovered;
193            }
194    
195            public void exhausted() {
196                stored = recovered;
197            }
198    
199            public boolean hasMessages() {
200                return stored > recovered;
201            }
202        }
203    
204        class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
205            final MessageRecoveryListener delegate;
206            final int maxMessages;
207            LastRecoveredEntry lastRecovered;
208            int recoveredCount;
209            int recoveredMarker;
210    
211            public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
212                this.delegate = delegate;
213                this.maxMessages = maxMessages;
214            }
215    
216            public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
217                if (delegate.hasSpace() && recoveredCount < maxMessages) {
218                    Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
219                    msg.getMessageId().setBrokerSequenceId(sequenceId);
220                    lastRecovered.recovered = sequenceId;
221                    if (delegate.recoverMessage(msg)) {
222                        recoveredCount++;
223                        return true;
224                    }
225                }
226                return false;
227            }
228    
229            public boolean recoverMessageReference(String reference) throws Exception {
230                return delegate.recoverMessageReference(new MessageId(reference));
231            }
232    
233            public void setLastRecovered(LastRecoveredEntry lastRecovered) {
234                this.lastRecovered = lastRecovered;
235                recoveredMarker = recoveredCount;
236            }
237    
238            public boolean complete() {
239                return  !delegate.hasSpace() || recoveredCount == maxMessages;
240            }
241    
242            public boolean stalled() {
243                return recoveredMarker == recoveredCount;
244            }
245        }
246    
247        public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
248                throws Exception {
249            //Duration duration = new Duration("recoverNextMessages");
250            TransactionContext c = persistenceAdapter.getTransactionContext();
251    
252            String key = getSubscriptionKey(clientId, subscriptionName);
253            if (!subscriberLastRecoveredMap.containsKey(key)) {
254               subscriberLastRecoveredMap.put(key, new LastRecovered());
255            }
256            final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
257            LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
258            try {
259                if (LOG.isTraceEnabled()) {
260                    LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
261                }
262                if (isPrioritizedMessages()) {
263                    Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
264                    for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
265                        LastRecoveredEntry entry = it.next();
266                        recoveredAwareListener.setLastRecovered(entry);
267                        //Duration microDuration = new Duration("recoverNextMessages:loop");
268                        adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
269                            entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
270                        //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
271                        if (recoveredAwareListener.stalled()) {
272                            if (recoveredAwareListener.complete()) {
273                                break;
274                            } else {
275                                entry.exhausted();
276                            }
277                        }
278                    }
279                } else {
280                    LastRecoveredEntry last = lastRecovered.defaultPriority();
281                    recoveredAwareListener.setLastRecovered(last);
282                    adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
283                            last.recovered, 0, maxReturned, recoveredAwareListener);
284                }
285                if (LOG.isTraceEnabled()) {
286                    LOG.trace(key + " last recovered: " + lastRecovered);
287                }
288                //duration.end();
289            } catch (SQLException e) {
290                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
291            } finally {
292                c.close();
293            }
294        }
295    
296        public void resetBatching(String clientId, String subscriptionName) {
297            String key = getSubscriptionKey(clientId, subscriptionName);
298            if (!pendingCompletion.contains(key))  {
299                subscriberLastRecoveredMap.remove(key);
300            } else {
301                LOG.trace(this +  ", skip resetBatch during pending completion for: " + key);
302            }
303        }
304    
305        public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
306            final String key = getSubscriptionKey(clientId, subscriptionName);
307            LastRecovered recovered = new LastRecovered();
308            recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
309            subscriberLastRecoveredMap.put(key, recovered);
310            pendingCompletion.add(key);
311            LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
312        }
313    
314        public void complete(String clientId, String subscriptionName) {
315            pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
316            LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
317        }
318    
319        protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
320            // update last recovered state
321            for (LastRecovered last : subscriberLastRecoveredMap.values()) {
322                last.updateStored(sequenceId, priority);
323            }
324            sequenceIdCacheSizeLock.writeLock().lock();
325            try {
326                sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
327            } finally {
328                sequenceIdCacheSizeLock.writeLock().unlock();
329            }
330        }
331    
332    
333        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
334            TransactionContext c = persistenceAdapter.getTransactionContext();
335            try {
336                c = persistenceAdapter.getTransactionContext();
337                adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
338            } catch (SQLException e) {
339                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
340                throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
341            } finally {
342                c.close();
343            }
344        }
345    
346        /**
347         * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
348         *      String)
349         */
350        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
351            TransactionContext c = persistenceAdapter.getTransactionContext();
352            try {
353                return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
354            } catch (SQLException e) {
355                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
356                throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
357            } finally {
358                c.close();
359            }
360        }
361    
362        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
363            TransactionContext c = persistenceAdapter.getTransactionContext();
364            try {
365                adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
366            } catch (SQLException e) {
367                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
368                throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
369            } finally {
370                c.close();
371                resetBatching(clientId, subscriptionName);
372            }
373        }
374    
375        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
376            TransactionContext c = persistenceAdapter.getTransactionContext();
377            try {
378                return adapter.doGetAllSubscriptions(c, destination);
379            } catch (SQLException e) {
380                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
381                throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
382            } finally {
383                c.close();
384            }
385        }
386    
387        public int getMessageCount(String clientId, String subscriberName) throws IOException {
388            //Duration duration = new Duration("getMessageCount");
389            int result = 0;
390            TransactionContext c = persistenceAdapter.getTransactionContext();
391            try {
392                result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
393            } catch (SQLException e) {
394                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
395                throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
396            } finally {
397                c.close();
398            }
399            if (LOG.isTraceEnabled()) {
400                LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
401            }
402            //duration.end();
403            return result;
404        }
405    
406        protected String getSubscriptionKey(String clientId, String subscriberName) {
407            String result = clientId + ":";
408            result += subscriberName != null ? subscriberName : "NOT_SET";
409            return result;
410        }
411    
412    }