001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.atomic.AtomicLong;
029    
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ActiveMQQueue;
033    import org.apache.activemq.command.ActiveMQTopic;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.SubscriptionInfo;
036    import org.apache.activemq.command.TransactionId;
037    import org.apache.activemq.kaha.CommandMarshaller;
038    import org.apache.activemq.kaha.ListContainer;
039    import org.apache.activemq.kaha.MapContainer;
040    import org.apache.activemq.kaha.MessageIdMarshaller;
041    import org.apache.activemq.kaha.Store;
042    import org.apache.activemq.kaha.StoreFactory;
043    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
044    import org.apache.activemq.store.MessageStore;
045    import org.apache.activemq.store.ReferenceStore;
046    import org.apache.activemq.store.ReferenceStoreAdapter;
047    import org.apache.activemq.store.TopicMessageStore;
048    import org.apache.activemq.store.TopicReferenceStore;
049    import org.apache.activemq.store.amq.AMQTx;
050    import org.apache.activemq.util.IOHelper;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
055    
056        
057    
058        private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStoreAdapter.class);
059        private static final String STORE_STATE = "store-state";
060        private static final String QUEUE_DATA = "queue-data";
061        private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
062        private static final Integer INDEX_VERSION = new Integer(7);
063        private static final String RECORD_REFERENCES = "record-references";
064        private static final String TRANSACTIONS = "transactions-state";
065        private MapContainer stateMap;
066        private MapContainer<TransactionId, AMQTx> preparedTransactions;
067        private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>();
068        private ListContainer<SubscriptionInfo> durableSubscribers;
069        private boolean storeValid;
070        private Store stateStore;
071        private boolean persistentIndex = true;
072        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
073        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
074        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
075        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
076        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
077       
078    
079        public KahaReferenceStoreAdapter(AtomicLong size){
080            super(size);
081        }
082        
083        public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
084            throw new RuntimeException("Use createQueueReferenceStore instead");
085        }
086    
087        public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
088            throws IOException {
089            throw new RuntimeException("Use createTopicReferenceStore instead");
090        }
091    
092        @Override
093        public synchronized void start() throws Exception {
094            super.start();
095            Store store = getStateStore();        
096            boolean empty = store.getMapContainerIds().isEmpty();
097            stateMap = store.getMapContainer("state", STORE_STATE);
098            stateMap.load();
099            storeValid=true;
100            if (!empty) {
101                AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
102                if (status != null) {
103                    storeValid = status.get();
104                }
105               
106                if (storeValid) {
107                    //check what version the indexes are at
108                    Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
109                    if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) {
110                        storeValid = false;
111                        LOG.warn("Indexes at an older version - need to regenerate");
112                    }
113                }
114                if (storeValid) {
115                    if (stateMap.containsKey(RECORD_REFERENCES)) {
116                        recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
117                    }
118                }
119            }
120            stateMap.put(STORE_STATE, new AtomicBoolean());
121            stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
122            durableSubscribers = store.getListContainer("durableSubscribers");
123            durableSubscribers.setMarshaller(new CommandMarshaller());
124            preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
125            // need to set the Marshallers here
126            preparedTransactions.setKeyMarshaller(Store.COMMAND_MARSHALLER);
127            preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
128        }
129    
130        @Override
131        public synchronized void stop() throws Exception {
132            stateMap.put(RECORD_REFERENCES, recordReferences);
133            stateMap.put(STORE_STATE, new AtomicBoolean(true));
134            stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
135            if (this.stateStore != null) {
136                this.stateStore.close();
137                this.stateStore = null;
138                this.stateMap = null;
139            }
140            super.stop();
141        }
142        
143        public void commitTransaction(ConnectionContext context) throws IOException {
144            //we don;t need to force on a commit - as the reference store
145            //is rebuilt on a non clean shutdown
146        }
147    
148        public boolean isStoreValid() {
149            return storeValid;
150        }
151    
152        public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
153            ReferenceStore rc = (ReferenceStore)queues.get(destination);
154            if (rc == null) {
155                rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, QUEUE_DATA),
156                                            destination);
157                messageStores.put(destination, rc);
158                // if(transactionStore!=null){
159                // rc=transactionStore.proxy(rc);
160                // }
161                queues.put(destination, rc);
162            }
163            return rc;
164        }
165    
166        public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
167            TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination);
168            if (rc == null) {
169                Store store = getStore();
170                MapContainer messageContainer = getMapReferenceContainer(destination.getPhysicalName(), "topic-data");
171                MapContainer subsContainer = getSubsMapContainer(destination.getPhysicalName() + "-Subscriptions", "blob");
172                ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.getPhysicalName(), "topic-acks");
173                ackContainer.setMarshaller(new TopicSubAckMarshaller());
174                rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer,
175                                                 destination);
176                messageStores.put(destination, rc);
177                // if(transactionStore!=null){
178                // rc=transactionStore.proxy(rc);
179                // }
180                topics.put(destination, rc);
181            }
182            return rc;
183        }
184    
185        public void removeReferenceStore(KahaReferenceStore referenceStore) {
186            ActiveMQDestination destination = referenceStore.getDestination();
187            if (destination.isQueue()) {
188                queues.remove(destination);
189                try {
190                    getStore().deleteMapContainer(destination, QUEUE_DATA);
191                } catch (IOException e) {
192                    LOG.error("Failed to delete " + QUEUE_DATA + " map container for destination: " + destination, e);
193                }
194            } else {
195                topics.remove(destination);
196            }
197            messageStores.remove(destination);
198        }
199    /*
200        public void buildReferenceFileIdsInUse() throws IOException {
201            recordReferences = new HashMap<Integer, AtomicInteger>();
202            Set<ActiveMQDestination> destinations = getDestinations();
203            for (ActiveMQDestination destination : destinations) {
204                if (destination.isQueue()) {
205                    KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
206                    store.addReferenceFileIdsInUse();
207                } else {
208                    KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
209                    store.addReferenceFileIdsInUse();
210                }
211            }
212        }
213        */
214    
215        protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
216                                                                                    String containerName)
217            throws IOException {
218            Store store = getStore();
219            MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex);
220            container.setIndexBinSize(getIndexBinSize());
221            container.setIndexKeySize(getIndexKeySize());
222            container.setIndexPageSize(getIndexPageSize());
223            container.setIndexMaxBinSize(getIndexMaxBinSize());
224            container.setIndexLoadFactor(getIndexLoadFactor());
225            container.setKeyMarshaller(new MessageIdMarshaller());
226            container.setValueMarshaller(new ReferenceRecordMarshaller());
227            container.load();
228            return container;
229        }
230    
231        synchronized void addInterestInRecordFile(int recordNumber) {
232            Integer key = Integer.valueOf(recordNumber);
233            AtomicInteger rr = recordReferences.get(key);
234            if (rr == null) {
235                rr = new AtomicInteger();
236                recordReferences.put(key, rr);
237            }
238            rr.incrementAndGet();
239        }
240    
241        synchronized void removeInterestInRecordFile(int recordNumber) {
242            Integer key = Integer.valueOf(recordNumber);
243            AtomicInteger rr = recordReferences.get(key);
244            if (rr != null && rr.decrementAndGet() <= 0) {
245                recordReferences.remove(key);
246            }
247        }
248    
249        /**
250         * @return
251         * @throws IOException
252         * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
253         */
254        public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
255            Set inUse = new HashSet<Integer>(recordReferences.keySet());
256    
257            Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator();
258            while (ackReferences.hasNext()) {
259                Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next();
260                if (!inUse.contains(ackReference.getKey())) {
261                    // should we keep this data file
262                    for (Integer referencedFileId : ackReference.getValue()) {
263                        if (inUse.contains(referencedFileId)) {
264                            // keep this ack file
265                            inUse.add(ackReference.getKey());
266                            LOG.debug("not removing data file: " + ackReference.getKey()
267                                            + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue());
268                            break;
269                        }
270                    }
271                }
272                if (!inUse.contains(ackReference.getKey())) {
273                   ackReferences.remove();
274                }
275            }
276    
277            return inUse;
278        }
279    
280        Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
281        public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) {
282            Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId));
283            if (referenceFileIds == null) {
284                referenceFileIds = new HashSet<Integer>();
285                referenceFileIds.add(Integer.valueOf(messageFileId));
286                ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds);
287            } else {
288                Integer id = Integer.valueOf(messageFileId);
289                if (!referenceFileIds.contains(id)) {
290                    referenceFileIds.add(id);
291                }
292            }
293        }
294    
295        /**
296         *
297         * @throws IOException
298         * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
299         */
300        public void clearMessages() throws IOException {
301            //don't delete messages as it will clear state - call base
302            //class method to clear out the data instead
303            super.deleteAllMessages();
304        }
305    
306        /**
307         *
308         * @throws IOException
309         * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
310         */
311    
312        public void recoverState() throws IOException {
313            Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
314            for (SubscriptionInfo info:set) {
315                LOG.info("Recovering subscriber state for durable subscriber: " + info);
316                TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
317                ts.addSubsciption(info, false);
318            }
319        }
320        
321        public void recoverSubscription(SubscriptionInfo info) throws IOException {
322            TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
323            LOG.info("Recovering subscriber state for durable subscriber: " + info);
324            ts.addSubsciption(info, false);
325        }
326        
327    
328        public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
329            Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
330            preparedTransactions.load();
331            for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) {
332                TransactionId key = i.next();
333                AMQTx value = preparedTransactions.get(key);
334                result.put(key, value);
335            }
336            return result;
337        }
338    
339        public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException {
340            preparedTransactions.clear();
341            for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) {
342                Map.Entry<TransactionId, AMQTx> entry = iter.next();
343                preparedTransactions.put(entry.getKey(), entry.getValue());
344            }
345        }
346    
347        @Override
348        public synchronized void setDirectory(File directory) {
349            File file = new File(directory, "data");
350            super.setDirectory(file);
351            this.stateStore = createStateStore(directory);
352        }
353    
354        protected synchronized Store getStateStore() throws IOException {
355            if (this.stateStore == null) {
356                File stateDirectory = new File(getDirectory(), "kr-state");
357                IOHelper.mkdirs(stateDirectory);
358                this.stateStore = createStateStore(getDirectory());
359            }
360            return this.stateStore;
361        }
362    
363        public void deleteAllMessages() throws IOException {
364            super.deleteAllMessages();
365            if (stateStore != null) {
366                if (stateStore.isInitialized()) {
367                    stateStore.clear();
368                } else {
369                    stateStore.delete();
370                }
371            } else {
372                File stateDirectory = new File(getDirectory(), "kr-state");
373                StoreFactory.delete(stateDirectory);
374            }
375        }
376    
377        public boolean isPersistentIndex() {
378                    return persistentIndex;
379            }
380    
381            public void setPersistentIndex(boolean persistentIndex) {
382                    this.persistentIndex = persistentIndex;
383            }
384    
385        private Store createStateStore(File directory) {
386            File stateDirectory = new File(directory, "state");
387            try {
388                IOHelper.mkdirs(stateDirectory);
389                return StoreFactory.open(stateDirectory, "rw");
390            } catch (IOException e) {
391                LOG.error("Failed to create the state store", e);
392            }
393            return null;
394        }
395    
396        protected void addSubscriberState(SubscriptionInfo info) throws IOException {
397            durableSubscribers.add(info);
398        }
399    
400        protected void removeSubscriberState(SubscriptionInfo info) {
401            durableSubscribers.remove(info);
402        }
403    
404        public int getIndexBinSize() {
405            return indexBinSize;
406        }
407    
408        public void setIndexBinSize(int indexBinSize) {
409            this.indexBinSize = indexBinSize;
410        }
411    
412        public int getIndexKeySize() {
413            return indexKeySize;
414        }
415    
416        public void setIndexKeySize(int indexKeySize) {
417            this.indexKeySize = indexKeySize;
418        }
419    
420        public int getIndexPageSize() {
421            return indexPageSize;
422        }
423    
424        public void setIndexPageSize(int indexPageSize) {
425            this.indexPageSize = indexPageSize;
426        }
427    
428        public int getIndexMaxBinSize() {
429            return indexMaxBinSize;
430        }
431    
432        public void setIndexMaxBinSize(int maxBinSize) {
433            this.indexMaxBinSize = maxBinSize;
434        }
435    
436        /**
437         * @return the loadFactor
438         */
439        public int getIndexLoadFactor() {
440            return indexLoadFactor;
441        }
442    
443        /**
444         * @param loadFactor the loadFactor to set
445         */
446        public void setIndexLoadFactor(int loadFactor) {
447            this.indexLoadFactor = loadFactor;
448        }
449    
450    
451    }