001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageAck;
029    import org.apache.activemq.command.MessageId;
030    import org.apache.activemq.command.SubscriptionInfo;
031    import org.apache.activemq.store.MessageRecoveryListener;
032    import org.apache.activemq.store.TopicMessageStore;
033    import org.apache.activemq.util.LRUCache;
034    import org.apache.activemq.util.SubscriptionKey;
035    
036    /**
037     * 
038     */
039    public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
040    
041        private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
042        private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
043    
044        public MemoryTopicMessageStore(ActiveMQDestination destination) {
045            this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
046        }
047    
048        public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
049            super(destination, messageTable);
050            this.subscriberDatabase = subscriberDatabase;
051            this.topicSubMap = makeSubMap();
052        }
053    
054        protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
055            return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
056        }
057        
058        protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
059            return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
060        }
061    
062        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
063            super.addMessage(context, message);
064            for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
065                MemoryTopicSub sub = i.next();
066                sub.addMessage(message.getMessageId(), message);
067            }
068        }
069    
070        public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
071                                             MessageId messageId, MessageAck ack) throws IOException {
072            SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
073            MemoryTopicSub sub = topicSubMap.get(key);
074            if (sub != null) {
075                sub.removeMessage(messageId);
076            }
077        }
078    
079        public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
080            return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
081        }
082    
083        public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
084            SubscriptionKey key = new SubscriptionKey(info);
085            MemoryTopicSub sub = new MemoryTopicSub();
086            topicSubMap.put(key, sub);
087            if (retroactive) {
088                for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
089                    Map.Entry entry = (Entry)i.next();
090                    sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
091                }
092            }
093            subscriberDatabase.put(key, info);
094        }
095    
096        public synchronized void deleteSubscription(String clientId, String subscriptionName) {
097            org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
098            subscriberDatabase.remove(key);
099            topicSubMap.remove(key);
100        }
101    
102        public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
103            MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
104            if (sub != null) {
105                sub.recoverSubscription(listener);
106            }
107        }
108    
109        public synchronized void delete() {
110            super.delete();
111            subscriberDatabase.clear();
112            topicSubMap.clear();
113        }
114    
115        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
116            return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
117        }
118    
119        public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
120            int result = 0;
121            MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
122            if (sub != null) {
123                result = sub.size();
124            }
125            return result;
126        }
127    
128        public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
129            MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
130            if (sub != null) {
131                sub.recoverNextMessages(maxReturned, listener);
132            }
133        }
134    
135        public void resetBatching(String clientId, String subscriptionName) {
136            MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
137            if (sub != null) {
138                sub.resetBatching();
139            }
140        }
141    }