001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.concurrent.ConcurrentHashMap;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.command.ActiveMQDestination;
025    import org.apache.activemq.command.Message;
026    import org.apache.activemq.command.MessageAck;
027    import org.apache.activemq.command.MessageId;
028    import org.apache.activemq.command.SubscriptionInfo;
029    import org.apache.activemq.kaha.ListContainer;
030    import org.apache.activemq.kaha.MapContainer;
031    import org.apache.activemq.kaha.Marshaller;
032    import org.apache.activemq.kaha.Store;
033    import org.apache.activemq.kaha.StoreEntry;
034    import org.apache.activemq.store.MessageRecoveryListener;
035    import org.apache.activemq.store.TopicMessageStore;
036    
037    /**
038     * 
039     */
040    public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
041    
042        protected ListContainer<TopicSubAck> ackContainer;
043        protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>();
044        private Map<String, SubscriptionInfo> subscriberContainer;
045        private Store store;
046    
047        public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer,
048                                     ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer,
049                                     ActiveMQDestination destination) throws IOException {
050            super(messageContainer, destination);
051            this.store = store;
052            this.ackContainer = ackContainer;
053            subscriberContainer = subsContainer;
054            // load all the Ack containers
055            for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) {
056                Object key = i.next();
057                addSubscriberMessageContainer(key);
058            }
059        }
060    
061        @Override
062        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
063            int subscriberCount = subscriberMessages.size();
064            if (subscriberCount > 0) {
065                MessageId id = message.getMessageId();
066                StoreEntry messageEntry = messageContainer.place(id, message);
067                TopicSubAck tsa = new TopicSubAck();
068                tsa.setCount(subscriberCount);
069                tsa.setMessageEntry(messageEntry);
070                StoreEntry ackEntry = ackContainer.placeLast(tsa);
071                for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
072                    TopicSubContainer container = i.next();
073                    ConsumerMessageRef ref = new ConsumerMessageRef();
074                    ref.setAckEntry(ackEntry);
075                    ref.setMessageEntry(messageEntry);
076                    ref.setMessageId(id);
077                    container.add(ref);
078                }
079            }
080        }
081    
082        public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
083                                             MessageId messageId, MessageAck ack) throws IOException {
084            String subcriberId = getSubscriptionKey(clientId, subscriptionName);
085            TopicSubContainer container = subscriberMessages.get(subcriberId);
086            if (container != null) {
087                ConsumerMessageRef ref = container.remove(messageId);
088                if (container.isEmpty()) {
089                    container.reset();
090                }
091                if (ref != null) {
092                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
093                    if (tsa != null) {
094                        if (tsa.decrementCount() <= 0) {
095                            StoreEntry entry = ref.getAckEntry();
096                            entry = ackContainer.refresh(entry);
097                            ackContainer.remove(entry);
098                            entry = tsa.getMessageEntry();
099                            entry = messageContainer.refresh(entry);
100                            messageContainer.remove(entry);
101                        } else {
102                            ackContainer.update(ref.getAckEntry(), tsa);
103                        }
104                    }
105                }
106            }
107        }
108    
109        public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
110            return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
111        }
112    
113        public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
114            String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
115            // if already exists - won't add it again as it causes data files
116            // to hang around
117            if (!subscriberContainer.containsKey(key)) {
118                subscriberContainer.put(key, info);
119            }
120            // add the subscriber
121            addSubscriberMessageContainer(key);
122            /*
123             * if(retroactive){ for(StoreEntry
124             * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
125             * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
126             * ConsumerMessageRef ref=new ConsumerMessageRef();
127             * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry());
128             * container.add(ref); } }
129             */
130        }
131    
132        public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
133            String key = getSubscriptionKey(clientId, subscriptionName);
134            removeSubscriberMessageContainer(key);
135        }
136    
137        public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
138            throws Exception {
139            String key = getSubscriptionKey(clientId, subscriptionName);
140            TopicSubContainer container = subscriberMessages.get(key);
141            if (container != null) {
142                for (Iterator i = container.iterator(); i.hasNext();) {
143                    ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
144                    Message msg = messageContainer.get(ref.getMessageEntry());
145                    if (msg != null) {
146                        if (!recoverMessage(listener, msg)) {
147                            break;
148                        }
149                    }
150                }
151            }
152        }
153    
154        public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
155                                        MessageRecoveryListener listener) throws Exception {
156            String key = getSubscriptionKey(clientId, subscriptionName);
157            TopicSubContainer container = subscriberMessages.get(key);
158            if (container != null) {
159                int count = 0;
160                StoreEntry entry = container.getBatchEntry();
161                if (entry == null) {
162                    entry = container.getEntry();
163                } else {
164                    entry = container.refreshEntry(entry);
165                    if (entry != null) {
166                        entry = container.getNextEntry(entry);
167                    }
168                }
169                if (entry != null) {
170                    do {
171                        ConsumerMessageRef consumerRef = container.get(entry);
172                        Message msg = messageContainer.getValue(consumerRef.getMessageEntry());
173                        if (msg != null) {
174                            recoverMessage(listener, msg);
175                            count++;
176                            container.setBatchEntry(msg.getMessageId().toString(), entry);
177                        } else {
178                            container.reset();
179                        }
180    
181                        entry = container.getNextEntry(entry);
182                    } while (entry != null && count < maxReturned && listener.hasSpace());
183                }
184            }
185        }
186    
187        public synchronized void delete() {
188            super.delete();
189            ackContainer.clear();
190            subscriberContainer.clear();
191        }
192    
193        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
194            return subscriberContainer.values()
195                .toArray(new SubscriptionInfo[subscriberContainer.size()]);
196        }
197    
198        protected String getSubscriptionKey(String clientId, String subscriberName) {
199            String result = clientId + ":";
200            result += subscriberName != null ? subscriberName : "NOT_SET";
201            return result;
202        }
203    
204        protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
205            MapContainer container = store.getMapContainer(key, "topic-subs");
206            container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
207            Marshaller marshaller = new ConsumerMessageRefMarshaller();
208            container.setValueMarshaller(marshaller);
209            TopicSubContainer tsc = new TopicSubContainer(container);
210            subscriberMessages.put(key, tsc);
211            return container;
212        }
213    
214        protected synchronized void removeSubscriberMessageContainer(Object key)
215                throws IOException {
216            subscriberContainer.remove(key);
217            TopicSubContainer container = subscriberMessages.remove(key);
218            if (container != null) {
219                for (Iterator i = container.iterator(); i.hasNext();) {
220                    ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
221                    if (ref != null) {
222                        TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
223                        if (tsa != null) {
224                            if (tsa.decrementCount() <= 0) {
225                                ackContainer.remove(ref.getAckEntry());
226                                messageContainer.remove(tsa.getMessageEntry());
227                            } else {
228                                ackContainer.update(ref.getAckEntry(), tsa);
229                            }
230                        }
231                    }
232                }
233                container.clear();
234            }
235            store.deleteListContainer(key, "topic-subs");
236    
237        }
238    
239        public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
240            String key = getSubscriptionKey(clientId, subscriberName);
241            TopicSubContainer container = subscriberMessages.get(key);
242            return container != null ? container.size() : 0;
243        }
244    
245        /**
246         * @param context
247         * @throws IOException
248         * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
249         */
250        public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
251            messageContainer.clear();
252            ackContainer.clear();
253            for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
254                TopicSubContainer container = i.next();
255                container.clear();
256            }
257        }
258    
259        public synchronized void resetBatching(String clientId, String subscriptionName) {
260            String key = getSubscriptionKey(clientId, subscriptionName);
261            TopicSubContainer topicSubContainer = subscriberMessages.get(key);
262            if (topicSubContainer != null) {
263                topicSubContainer.reset();
264            }
265        }
266    }