001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.util.Iterator;
020    
021    import org.apache.activemq.command.MessageId;
022    import org.apache.activemq.kaha.MapContainer;
023    import org.apache.activemq.kaha.StoreEntry;
024    
025    /**
026     * Holds information for the subscriber
027     * 
028     * 
029     */
030    public class TopicSubContainer {
031        private transient MapContainer mapContainer;
032        private transient StoreEntry batchEntry;
033    
034        public TopicSubContainer(MapContainer container) {
035            this.mapContainer = container;
036        }
037    
038        /**
039         * @return the batchEntry
040         */
041        public StoreEntry getBatchEntry() {
042            return this.batchEntry;
043        }
044    
045        /**
046         * @param id
047         * @param batchEntry the batchEntry to set
048         */
049        public void setBatchEntry(String id, StoreEntry batchEntry) {
050            this.batchEntry = batchEntry;
051        }
052    
053        public void reset() {
054            batchEntry = null;
055        }
056    
057        public boolean isEmpty() {
058            return mapContainer.isEmpty();
059        }
060    
061        public StoreEntry add(ConsumerMessageRef ref) {
062            return mapContainer.place(ref.getMessageId(),ref);
063        }
064    
065        public ConsumerMessageRef remove(MessageId id) {
066            ConsumerMessageRef result = null;
067            StoreEntry entry = mapContainer.getEntry(id);
068            if (entry != null) {
069                result = (ConsumerMessageRef) mapContainer.getValue(entry);
070                mapContainer.remove(entry);
071                if (batchEntry != null && batchEntry.equals(entry)) {
072                    reset();
073                }
074            }
075            if(mapContainer.isEmpty()) {
076                reset();
077            }
078            return result;
079        }
080        
081        
082        public ConsumerMessageRef get(StoreEntry entry) {
083            return (ConsumerMessageRef)mapContainer.getValue(entry);
084        }
085    
086        public StoreEntry getEntry() {
087            return mapContainer.getFirst();
088        }
089    
090        public StoreEntry refreshEntry(StoreEntry entry) {
091            return mapContainer.refresh(entry);
092        }
093    
094        public StoreEntry getNextEntry(StoreEntry entry) {
095            return mapContainer.getNext(entry);
096        }
097    
098        public Iterator iterator() {
099            return mapContainer.values().iterator();
100        }
101    
102        public int size() {
103            return mapContainer.size();
104        }
105    
106        public void clear() {
107            reset();
108            mapContainer.clear();
109        }
110    }