001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.container;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.Collection;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Set;
025    
026    import org.apache.activemq.kaha.ContainerId;
027    import org.apache.activemq.kaha.IndexMBean;
028    import org.apache.activemq.kaha.MapContainer;
029    import org.apache.activemq.kaha.Marshaller;
030    import org.apache.activemq.kaha.RuntimeStoreException;
031    import org.apache.activemq.kaha.Store;
032    import org.apache.activemq.kaha.StoreEntry;
033    import org.apache.activemq.kaha.StoreLocation;
034    import org.apache.activemq.kaha.impl.DataManager;
035    import org.apache.activemq.kaha.impl.data.Item;
036    import org.apache.activemq.kaha.impl.index.Index;
037    import org.apache.activemq.kaha.impl.index.IndexItem;
038    import org.apache.activemq.kaha.impl.index.IndexLinkedList;
039    import org.apache.activemq.kaha.impl.index.IndexManager;
040    import org.apache.activemq.kaha.impl.index.VMIndex;
041    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * Implementation of a MapContainer
047     * 
048     * 
049     */
050    public final class MapContainerImpl extends BaseContainerImpl implements MapContainer {
051    
052        private static final Logger LOG = LoggerFactory.getLogger(MapContainerImpl.class);
053        protected Index index;
054        protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
055        protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
056        protected File directory;
057        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
058        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
059        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
060        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
061        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
062    
063        public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
064                                DataManager dataManager, boolean persistentIndex) {
065            super(id, root, indexManager, dataManager, persistentIndex);
066            this.directory = directory;
067        }
068    
069        public synchronized void init() {
070            super.init();
071            if (index == null) {
072                if (persistentIndex) {
073                    String name = containerId.getDataContainerName() + "_" + containerId.getKey();
074                    try {
075                        HashIndex hashIndex = new HashIndex(directory, name, indexManager);
076                        hashIndex.setNumberOfBins(getIndexBinSize());
077                        hashIndex.setKeySize(getIndexKeySize());
078                        hashIndex.setPageSize(getIndexPageSize());
079                        hashIndex.setMaximumCapacity(getIndexMaxBinSize());
080                        hashIndex.setLoadFactor(getIndexLoadFactor());
081                        this.index = hashIndex;
082                    } catch (IOException e) {
083                        LOG.error("Failed to create HashIndex", e);
084                        throw new RuntimeException(e);
085                    }
086                } else {
087                    this.index = new VMIndex(indexManager);
088                }
089            }
090            index.setKeyMarshaller(keyMarshaller);
091        }
092    
093        /*
094         * (non-Javadoc)
095         * 
096         * @see org.apache.activemq.kaha.MapContainer#load()
097         */
098        public synchronized void load() {
099            checkClosed();
100            if (!loaded) {
101                if (!loaded) {
102                    loaded = true;
103                    try {
104                        init();
105                        index.load();
106                        long nextItem = root.getNextItem();
107                        while (nextItem != Item.POSITION_NOT_SET) {
108                            IndexItem item = indexManager.getIndex(nextItem);
109                            StoreLocation data = item.getKeyDataItem();
110                            Object key = dataManager.readItem(keyMarshaller, data);
111                            if (index.isTransient()) {
112                                index.store(key, item);
113                            }
114                            indexList.add(item);
115                            nextItem = item.getNextItem();
116                        }
117                    } catch (IOException e) {
118                        LOG.error("Failed to load container " + getId(), e);
119                        throw new RuntimeStoreException(e);
120                    }
121                }
122            }
123        }
124    
125        /*
126         * (non-Javadoc)
127         * 
128         * @see org.apache.activemq.kaha.MapContainer#unload()
129         */
130        public synchronized void unload() {
131            checkClosed();
132            if (loaded) {
133                loaded = false;
134                try {
135                    index.unload();
136                } catch (IOException e) {
137                    LOG.warn("Failed to unload the index", e);
138                }
139                indexList.clear();
140            }
141        }
142    
143        public synchronized void delete() {
144            unload();
145            try {
146                index.delete();
147            } catch (IOException e) {
148                LOG.warn("Failed to unload the index", e);
149            }
150        }
151    
152    
153        public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
154            checkClosed();
155            this.keyMarshaller = keyMarshaller;
156            if (index != null) {
157                index.setKeyMarshaller(keyMarshaller);
158            }
159        }
160    
161        public synchronized void setValueMarshaller(Marshaller valueMarshaller) {
162            checkClosed();
163            this.valueMarshaller = valueMarshaller;
164        }
165    
166        /*
167         * (non-Javadoc)
168         * 
169         * @see org.apache.activemq.kaha.MapContainer#size()
170         */
171        public synchronized int size() {
172            load();
173            return indexList.size();
174        }
175    
176        /*
177         * (non-Javadoc)
178         * 
179         * @see org.apache.activemq.kaha.MapContainer#isEmpty()
180         */
181        public synchronized boolean isEmpty() {
182            load();
183            return indexList.isEmpty();
184        }
185    
186        /*
187         * (non-Javadoc)
188         * 
189         * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
190         */
191        public synchronized boolean containsKey(Object key) {
192            load();
193            try {
194                return index.containsKey(key);
195            } catch (IOException e) {
196                LOG.error("Failed trying to find key: " + key, e);
197                throw new RuntimeException(e);
198            }
199        }
200    
201        /*
202         * (non-Javadoc)
203         * 
204         * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
205         */
206        public synchronized Object get(Object key) {
207            load();
208            Object result = null;
209            StoreEntry item = null;
210            try {
211                item = index.get(key);
212            } catch (IOException e) {
213                LOG.error("Failed trying to get key: " + key, e);
214                throw new RuntimeException(e);
215            }
216            if (item != null) {
217                result = getValue(item);
218            }
219            return result;
220        }
221    
222        /**
223         * Get the StoreEntry associated with the key
224         * 
225         * @param key
226         * @return the StoreEntry
227         */
228        public synchronized StoreEntry getEntry(Object key) {
229            load();
230            StoreEntry item = null;
231            try {
232                item = index.get(key);
233            } catch (IOException e) {
234                LOG.error("Failed trying to get key: " + key, e);
235                throw new RuntimeException(e);
236            }
237            return item;
238        }
239    
240        /*
241         * (non-Javadoc)
242         * 
243         * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
244         */
245        public synchronized boolean containsValue(Object o) {
246            load();
247            boolean result = false;
248            if (o != null) {
249                IndexItem item = indexList.getFirst();
250                while (item != null) {
251                    Object value = getValue(item);
252                    if (value != null && value.equals(o)) {
253                        result = true;
254                        break;
255                    }
256                    item = indexList.getNextEntry(item);
257                }
258            }
259            return result;
260        }
261    
262        /*
263         * (non-Javadoc)
264         * 
265         * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
266         */
267        public synchronized void putAll(Map t) {
268            load();
269            if (t != null) {
270                for (Iterator i = t.entrySet().iterator(); i.hasNext();) {
271                    Map.Entry entry = (Map.Entry)i.next();
272                    put(entry.getKey(), entry.getValue());
273                }
274            }
275        }
276    
277        /*
278         * (non-Javadoc)
279         * 
280         * @see org.apache.activemq.kaha.MapContainer#keySet()
281         */
282        public synchronized Set keySet() {
283            load();
284            return new ContainerKeySet(this);
285        }
286    
287        /*
288         * (non-Javadoc)
289         * 
290         * @see org.apache.activemq.kaha.MapContainer#values()
291         */
292        public synchronized Collection values() {
293            load();
294            return new ContainerValueCollection(this);
295        }
296    
297        /*
298         * (non-Javadoc)
299         * 
300         * @see org.apache.activemq.kaha.MapContainer#entrySet()
301         */
302        public synchronized Set entrySet() {
303            load();
304            return new ContainerEntrySet(this);
305        }
306    
307        /*
308         * (non-Javadoc)
309         * 
310         * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
311         *      java.lang.Object)
312         */
313        public synchronized Object put(Object key, Object value) {
314            load();
315            Object result = remove(key);
316            IndexItem item = write(key, value);
317            try {
318                index.store(key, item);
319            } catch (IOException e) {
320                LOG.error("Failed trying to insert key: " + key, e);
321                throw new RuntimeException(e);
322            }
323            indexList.add(item);
324            return result;
325        }
326    
327        /*
328         * (non-Javadoc)
329         * 
330         * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
331         */
332        public synchronized Object remove(Object key) {
333            load();
334            try {
335                Object result = null;
336                IndexItem item = (IndexItem)index.remove(key);
337                if (item != null) {
338                    // refresh the index
339                    item = (IndexItem)indexList.refreshEntry(item);
340                    result = getValue(item);
341                    IndexItem prev = indexList.getPrevEntry(item);
342                    IndexItem next = indexList.getNextEntry(item);
343                    indexList.remove(item);
344                    delete(item, prev, next);
345                }
346                return result;
347            } catch (IOException e) {
348                LOG.error("Failed trying to remove key: " + key, e);
349                throw new RuntimeException(e);
350            }
351        }
352    
353        public synchronized boolean removeValue(Object o) {
354            load();
355            boolean result = false;
356            if (o != null) {
357                IndexItem item = indexList.getFirst();
358                while (item != null) {
359                    Object value = getValue(item);
360                    if (value != null && value.equals(o)) {
361                        result = true;
362                        // find the key
363                        Object key = getKey(item);
364                        if (key != null) {
365                            remove(key);
366                        }
367                        break;
368                    }
369                    item = indexList.getNextEntry(item);
370                }
371            }
372            return result;
373        }
374    
375        protected synchronized void remove(IndexItem item) {
376            Object key = getKey(item);
377            if (key != null) {
378                remove(key);
379            }
380        }
381    
382        /*
383         * (non-Javadoc)
384         * 
385         * @see org.apache.activemq.kaha.MapContainer#clear()
386         */
387        public synchronized void clear() {
388            checkClosed();
389            loaded = true;
390            init();
391            if (index != null) {
392                try {
393                    index.clear();
394                } catch (IOException e) {
395                    LOG.error("Failed trying clear index", e);
396                    throw new RuntimeException(e);
397                }
398            }
399            super.clear();
400            doClear();
401        }
402    
403        /**
404         * Add an entry to the Store Map
405         * 
406         * @param key
407         * @param value
408         * @return the StoreEntry associated with the entry
409         */
410        public synchronized StoreEntry place(Object key, Object value) {
411            load();
412            try {
413                remove(key);
414                IndexItem item = write(key, value);
415                index.store(key, item);
416                indexList.add(item);
417                return item;
418            } catch (IOException e) {
419                LOG.error("Failed trying to place key: " + key, e);
420                throw new RuntimeException(e);
421            }
422        }
423    
424        /**
425         * Remove an Entry from ther Map
426         * 
427         * @param entry
428         * @throws IOException
429         */
430        public synchronized void remove(StoreEntry entry) {
431            load();
432            IndexItem item = (IndexItem)entry;
433            if (item != null) {
434                Object key = getKey(item);
435                try {
436                    index.remove(key);
437                } catch (IOException e) {
438                    LOG.error("Failed trying to remove entry: " + entry, e);
439                    throw new RuntimeException(e);
440                }
441                IndexItem prev = indexList.getPrevEntry(item);
442                IndexItem next = indexList.getNextEntry(item);
443                indexList.remove(item);
444                delete(item, prev, next);
445            }
446        }
447    
448        public synchronized StoreEntry getFirst() {
449            load();
450            return indexList.getFirst();
451        }
452    
453        public synchronized StoreEntry getLast() {
454            load();
455            return indexList.getLast();
456        }
457    
458        public synchronized StoreEntry getNext(StoreEntry entry) {
459            load();
460            IndexItem item = (IndexItem)entry;
461            return indexList.getNextEntry(item);
462        }
463    
464        public synchronized StoreEntry getPrevious(StoreEntry entry) {
465            load();
466            IndexItem item = (IndexItem)entry;
467            return indexList.getPrevEntry(item);
468        }
469    
470        public synchronized StoreEntry refresh(StoreEntry entry) {
471            load();
472            return indexList.getEntry(entry);
473        }
474    
475        /**
476         * Get the value from it's location
477         * 
478         * @param item
479         * @return the value associated with the store entry
480         */
481        public synchronized Object getValue(StoreEntry item) {
482            load();
483            Object result = null;
484            if (item != null) {
485                try {
486                    // ensure this value is up to date
487                    // item=indexList.getEntry(item);
488                    StoreLocation data = item.getValueDataItem();
489                    result = dataManager.readItem(valueMarshaller, data);
490                } catch (IOException e) {
491                    LOG.error("Failed to get value for " + item, e);
492                    throw new RuntimeStoreException(e);
493                }
494            }
495            return result;
496        }
497    
498        /**
499         * Get the Key object from it's location
500         * 
501         * @param item
502         * @return the Key Object associated with the StoreEntry
503         */
504        public synchronized Object getKey(StoreEntry item) {
505            load();
506            Object result = null;
507            if (item != null) {
508                try {
509                    StoreLocation data = item.getKeyDataItem();
510                    result = dataManager.readItem(keyMarshaller, data);
511                } catch (IOException e) {
512                    LOG.error("Failed to get key for " + item, e);
513                    throw new RuntimeStoreException(e);
514                }
515            }
516            return result;
517        }
518    
519        protected IndexLinkedList getItemList() {
520            return indexList;
521        }
522    
523        protected synchronized IndexItem write(Object key, Object value) {
524            IndexItem index = null;
525            try {
526                index = indexManager.createNewIndex();
527                StoreLocation data = dataManager.storeDataItem(keyMarshaller, key);
528                index.setKeyData(data);
529    
530                if (value != null) {
531                    data = dataManager.storeDataItem(valueMarshaller, value);
532                    index.setValueData(data);
533                }
534                IndexItem prev = indexList.getLast();
535                prev = prev != null ? prev : indexList.getRoot();
536                IndexItem next = indexList.getNextEntry(prev);
537                prev.setNextItem(index.getOffset());
538                index.setPreviousItem(prev.getOffset());
539                updateIndexes(prev);
540                if (next != null) {
541                    next.setPreviousItem(index.getOffset());
542                    index.setNextItem(next.getOffset());
543                    updateIndexes(next);
544                }
545                storeIndex(index);
546            } catch (IOException e) {
547                LOG.error("Failed to write " + key + " , " + value, e);
548                throw new RuntimeStoreException(e);
549            }
550            return index;
551        }
552    
553        public int getIndexBinSize() {
554            return indexBinSize;
555        }
556    
557        public void setIndexBinSize(int indexBinSize) {
558            this.indexBinSize = indexBinSize;
559        }
560    
561        public int getIndexKeySize() {
562            return indexKeySize;
563        }
564    
565        public void setIndexKeySize(int indexKeySize) {
566            this.indexKeySize = indexKeySize;
567        }
568    
569        public int getIndexPageSize() {
570            return indexPageSize;
571        }
572    
573        public void setIndexPageSize(int indexPageSize) {
574            this.indexPageSize = indexPageSize;
575        }
576        
577        public int getIndexLoadFactor() {
578            return indexLoadFactor;
579        }
580    
581        public void setIndexLoadFactor(int loadFactor) {
582            this.indexLoadFactor = loadFactor;
583        }
584    
585      
586        public IndexMBean getIndexMBean() {
587          return (IndexMBean) index;
588        }
589        public int getIndexMaxBinSize() {
590            return indexMaxBinSize;
591        }
592    
593        public void setIndexMaxBinSize(int maxBinSize) {
594            this.indexMaxBinSize = maxBinSize;
595        }
596       
597    
598       
599        public String toString() {
600            load();
601            StringBuffer buf = new StringBuffer();
602            buf.append("{");
603            Iterator i = entrySet().iterator();
604            boolean hasNext = i.hasNext();
605            while (hasNext) {
606                Map.Entry e = (Entry) i.next();
607                Object key = e.getKey();
608                Object value = e.getValue();
609                buf.append(key);
610                buf.append("=");
611    
612                buf.append(value);
613                hasNext = i.hasNext();
614                if (hasNext)
615                    buf.append(", ");
616            }
617            buf.append("}");
618            return buf.toString();
619        }    
620    }