001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.HashSet;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.Map.Entry;
026    import java.util.concurrent.ConcurrentHashMap;
027    
028    import org.apache.activemq.broker.ConnectionContext;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageAck;
032    import org.apache.activemq.command.MessageId;
033    import org.apache.activemq.command.SubscriptionInfo;
034    import org.apache.activemq.kaha.ListContainer;
035    import org.apache.activemq.kaha.MapContainer;
036    import org.apache.activemq.kaha.Marshaller;
037    import org.apache.activemq.kaha.Store;
038    import org.apache.activemq.kaha.StoreEntry;
039    import org.apache.activemq.store.MessageRecoveryListener;
040    import org.apache.activemq.store.TopicReferenceStore;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
045        private static final Logger LOG = LoggerFactory.getLogger(KahaTopicReferenceStore.class);
046        protected ListContainer<TopicSubAck> ackContainer;
047        protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
048        private MapContainer<String, SubscriptionInfo> subscriberContainer;
049        private Store store;
050        private static final String TOPIC_SUB_NAME = "tsn";
051    
052        public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
053                                       MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
054                                       MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination)
055            throws IOException {
056            super(adapter, messageContainer, destination);
057            this.store = store;
058            this.ackContainer = ackContainer;
059            subscriberContainer = subsContainer;
060            // load all the Ack containers
061            for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) {
062                SubscriptionInfo info = i.next();
063                addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
064            }
065        }
066    
067        public void dispose(ConnectionContext context) {
068            super.dispose(context);
069            subscriberContainer.delete();
070        }
071    
072        protected MessageId getMessageId(Object object) {
073            return new MessageId(((ReferenceRecord)object).getMessageId());
074        }
075    
076        public void addMessage(ConnectionContext context, Message message) throws IOException {
077            throw new RuntimeException("Use addMessageReference instead");
078        }
079    
080        public Message getMessage(MessageId identity) throws IOException {
081            throw new RuntimeException("Use addMessageReference instead");
082        }
083    
084        public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
085                                        final ReferenceData data) {
086            boolean uniqueReferenceAdded = false;
087            lock.lock();
088            try {
089                final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
090                final int subscriberCount = subscriberMessages.size();
091                if (subscriberCount > 0 && !isDuplicate(messageId)) {
092                    final StoreEntry messageEntry = messageContainer.place(messageId, record);
093                    addInterest(record);
094                    uniqueReferenceAdded = true;
095                    final TopicSubAck tsa = new TopicSubAck();
096                    tsa.setCount(subscriberCount);
097                    tsa.setMessageEntry(messageEntry);
098                    final StoreEntry ackEntry = ackContainer.placeLast(tsa);
099                    for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
100                        final TopicSubContainer container = i.next();
101                        final ConsumerMessageRef ref = new ConsumerMessageRef();
102                        ref.setAckEntry(ackEntry);
103                        ref.setMessageEntry(messageEntry);
104                        ref.setMessageId(messageId);
105                        container.add(ref);
106                    }
107                    if (LOG.isTraceEnabled()) {
108                        LOG.trace(destination.getPhysicalName() + " add reference: " + messageId);
109                    }
110                } else {
111                    if (LOG.isTraceEnabled()) {
112                        LOG.trace("no subscribers or duplicate add for: "  + messageId);
113                    }
114                }
115            } finally {
116                lock.unlock();
117            }
118            return uniqueReferenceAdded;
119        }
120    
121        public ReferenceData getMessageReference(final MessageId identity) throws IOException {
122            final ReferenceRecord result = messageContainer.get(identity);
123            if (result == null) {
124                return null;
125            }
126            return result.getData();
127        }
128    
129        public void addReferenceFileIdsInUse() {
130            for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
131                TopicSubAck subAck = ackContainer.get(entry);
132                if (subAck.getCount() > 0) {
133                    ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry());
134                    addInterest(rr);
135                }
136            }
137        }
138    
139        
140        protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
141            String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName));
142            MapContainer container = store.getMapContainer(containerName,containerName);
143            container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
144            Marshaller marshaller = new ConsumerMessageRefMarshaller();
145            container.setValueMarshaller(marshaller);
146            TopicSubContainer tsc = new TopicSubContainer(container);
147            subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
148            return container;
149        }
150    
151        public boolean acknowledgeReference(ConnectionContext context,
152                String clientId, String subscriptionName, MessageId messageId)
153                throws IOException {
154            boolean removeMessage = false;
155            lock.lock();
156                try {
157                String key = getSubscriptionKey(clientId, subscriptionName);
158        
159                TopicSubContainer container = subscriberMessages.get(key);
160                if (container != null) {
161                    ConsumerMessageRef ref = null;
162                    if((ref = container.remove(messageId)) != null) {
163                        StoreEntry entry = ref.getAckEntry();
164                        //ensure we get up to-date pointers
165                        entry = ackContainer.refresh(entry);
166                        TopicSubAck tsa = ackContainer.get(entry);
167                        if (tsa != null) {
168                            if (tsa.decrementCount() <= 0) {
169                                ackContainer.remove(entry);
170                                ReferenceRecord rr = messageContainer.get(messageId);
171                                if (rr != null) {
172                                    entry = tsa.getMessageEntry();
173                                    entry = messageContainer.refresh(entry);
174                                    messageContainer.remove(entry);
175                                    removeInterest(rr);
176                                    removeMessage = true;
177                                    dispatchAudit.isDuplicate(messageId);
178                                }
179                            }else {
180                                ackContainer.update(entry,tsa);
181                            }
182                        }
183                        if (LOG.isTraceEnabled()) {
184                            LOG.trace(destination.getPhysicalName() + " remove: " + messageId);
185                        }
186                    }else{
187                        if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) {
188                            // no message reference held        
189                            removeMessage = true;
190                            // ensure we don't later add a reference
191                            dispatchAudit.isDuplicate(messageId);
192                            if (LOG.isDebugEnabled()) {
193                                LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (ack before add): " + messageId);
194                            }
195                        }
196                    }
197                }
198            }finally {
199                lock.unlock();
200            }
201            return removeMessage;
202        }
203        
204        // verify that no subscriber has a reference to this message. In the case where the subscribers
205        // references are persisted but more than the persisted consumers get the message, the ack from the non
206        // persisted consumer would remove the message in error
207        //
208        // see: https://issues.apache.org/activemq/browse/AMQ-2123
209        private boolean isUnreferencedBySubscribers(
210                String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
211            boolean isUnreferenced = true;
212            for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) {
213                if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) {
214                    TopicSubContainer container = entry.getValue();
215                    for (Iterator i = container.iterator(); i.hasNext();) {
216                        ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
217                        if (messageId.equals(ref.getMessageId())) {
218                            isUnreferenced = false;
219                            break;
220                        }
221                    }
222                }
223            }
224            return isUnreferenced; 
225        }
226    
227        public void acknowledge(ConnectionContext context,
228                            String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
229                acknowledgeReference(context, clientId, subscriptionName, messageId);
230            }
231    
232        public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
233            String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
234            lock.lock();
235            try {
236                // if already exists - won't add it again as it causes data files
237                // to hang around
238                if (!subscriberContainer.containsKey(key)) {
239                    subscriberContainer.put(key, info);
240                    adapter.addSubscriberState(info);
241                }
242                // add the subscriber
243                addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
244                if (retroactive) {
245                    /*
246                     * for(StoreEntry
247                     * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
248                     * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
249                     * ConsumerMessageRef ref=new ConsumerMessageRef();
250                     * ref.setAckEntry(entry);
251                     * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
252                     */
253                }
254            }finally {
255                lock.unlock();
256            }
257        }
258    
259        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
260            lock.lock();
261            try {
262                SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
263                if (info != null) {
264                    adapter.removeSubscriberState(info);
265                }
266            removeSubscriberMessageContainer(clientId,subscriptionName);
267            }finally {
268                lock.unlock();
269            }
270        }
271    
272        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
273            SubscriptionInfo[] result = subscriberContainer.values()
274                .toArray(new SubscriptionInfo[subscriberContainer.size()]);
275            return result;
276        }
277    
278        public int getMessageCount(String clientId, String subscriberName) throws IOException {
279            String key = getSubscriptionKey(clientId, subscriberName);
280            TopicSubContainer container = subscriberMessages.get(key);
281            return container != null ? container.size() : 0;
282        }
283    
284        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
285            return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
286        }
287    
288        public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
289                                                     MessageRecoveryListener listener) throws Exception {
290            String key = getSubscriptionKey(clientId, subscriptionName);
291            lock.lock();
292            try {
293                TopicSubContainer container = subscriberMessages.get(key);
294                if (container != null) {
295                    int count = 0;
296                    StoreEntry entry = container.getBatchEntry();
297                    if (entry == null) {
298                        entry = container.getEntry();
299                    } else {
300                        entry = container.refreshEntry(entry);
301                        if (entry != null) {
302                            entry = container.getNextEntry(entry);
303                        }
304                    }
305                   
306                    if (entry != null) {
307                        do {
308                            ConsumerMessageRef consumerRef = container.get(entry);
309                            ReferenceRecord msg = messageContainer.getValue(consumerRef
310                                    .getMessageEntry());
311                            if (msg != null) {
312                                if (recoverReference(listener, msg)) {
313                                    count++;
314                                    container.setBatchEntry(msg.getMessageId(), entry);
315                                }
316                            } else {
317                                container.reset();
318                            }
319        
320                            entry = container.getNextEntry(entry);
321                        } while (entry != null && count < maxReturned && listener.hasSpace());
322                    }
323                }
324            }finally {
325                lock.unlock();
326            }
327        }
328    
329        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
330            throws Exception {
331            String key = getSubscriptionKey(clientId, subscriptionName);
332            TopicSubContainer container = subscriberMessages.get(key);
333            if (container != null) {
334                for (Iterator i = container.iterator(); i.hasNext();) {
335                    ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
336                    ReferenceRecord msg = messageContainer.getValue(ref.getMessageEntry());
337                    if (msg != null) {
338                        if (!recoverReference(listener, msg)) {
339                            break;
340                        }
341                    }
342                }
343            }
344        }
345    
346        public void resetBatching(String clientId, String subscriptionName) {
347            lock.lock();
348            try {
349                String key = getSubscriptionKey(clientId, subscriptionName);
350                TopicSubContainer topicSubContainer = subscriberMessages.get(key);
351                if (topicSubContainer != null) {
352                    topicSubContainer.reset();
353                }
354            }finally {
355                lock.unlock();
356            }
357        }
358        
359        public void removeAllMessages(ConnectionContext context) throws IOException {
360            lock.lock();
361            try {
362                Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet());
363                for (String key:tmpSet) {
364                    TopicSubContainer container = subscriberMessages.get(key);
365                    if (container != null) {
366                        container.clear();
367                    }
368                }
369                ackContainer.clear();
370            }finally {
371                lock.unlock();
372            }
373            super.removeAllMessages(context);
374        }
375    
376        protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
377            String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
378            String containerName = getSubscriptionContainerName(subscriberKey);
379            subscriberContainer.remove(subscriberKey);
380            TopicSubContainer container = subscriberMessages.remove(subscriberKey);
381            if (container != null) {
382                for (Iterator i = container.iterator(); i.hasNext();) {
383                    ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
384                    if (ref != null) {
385                        TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
386                        if (tsa != null) {
387                            if (tsa.decrementCount() <= 0) {
388                                ackContainer.remove(ref.getAckEntry());
389                                messageContainer.remove(tsa.getMessageEntry());
390                            } else {
391                                ackContainer.update(ref.getAckEntry(), tsa);
392                            }
393                        }
394                    }
395                }
396            }
397            store.deleteMapContainer(containerName,containerName);
398        }
399    
400        protected String getSubscriptionKey(String clientId, String subscriberName) {
401            StringBuffer buffer = new StringBuffer();
402            buffer.append(clientId).append(":");  
403            String name = subscriberName != null ? subscriberName : "NOT_SET";
404            return buffer.append(name).toString();
405        }
406        
407        private String getSubscriptionContainerName(String subscriptionKey) {
408            StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
409            result.append(destination.getQualifiedName());
410            result.append(subscriptionKey);
411            return result.toString();
412        }
413    }