001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.container;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.Collection;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Set;
025    
026    import org.apache.activemq.kaha.ContainerId;
027    import org.apache.activemq.kaha.IndexMBean;
028    import org.apache.activemq.kaha.MapContainer;
029    import org.apache.activemq.kaha.Marshaller;
030    import org.apache.activemq.kaha.RuntimeStoreException;
031    import org.apache.activemq.kaha.Store;
032    import org.apache.activemq.kaha.StoreEntry;
033    import org.apache.activemq.kaha.StoreLocation;
034    import org.apache.activemq.kaha.impl.DataManager;
035    import org.apache.activemq.kaha.impl.data.Item;
036    import org.apache.activemq.kaha.impl.index.Index;
037    import org.apache.activemq.kaha.impl.index.IndexItem;
038    import org.apache.activemq.kaha.impl.index.IndexLinkedList;
039    import org.apache.activemq.kaha.impl.index.IndexManager;
040    import org.apache.activemq.kaha.impl.index.VMIndex;
041    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
042    import org.apache.activemq.util.IOHelper;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * Implementation of a MapContainer
048     * 
049     * 
050     */
051    public final class MapContainerImpl extends BaseContainerImpl implements MapContainer {
052    
053        private static final Logger LOG = LoggerFactory.getLogger(MapContainerImpl.class);
054        protected Index index;
055        protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
056        protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
057        protected File directory;
058        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
059        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
060        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
061        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
062        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
063    
064        public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
065                                DataManager dataManager, boolean persistentIndex) {
066            super(id, root, indexManager, dataManager, persistentIndex);
067            this.directory = directory;
068        }
069    
070        public synchronized void init() {
071            super.init();
072            if (index == null) {
073                if (persistentIndex) {
074                    String name = containerId.getDataContainerName() + "_" + containerId.getKey();
075                    try {
076                        HashIndex hashIndex = new HashIndex(directory, name, indexManager);
077                        hashIndex.setNumberOfBins(getIndexBinSize());
078                        hashIndex.setKeySize(getIndexKeySize());
079                        hashIndex.setPageSize(getIndexPageSize());
080                        hashIndex.setMaximumCapacity(getIndexMaxBinSize());
081                        hashIndex.setLoadFactor(getIndexLoadFactor());
082                        this.index = hashIndex;
083                    } catch (IOException e) {
084                        LOG.error("Failed to create HashIndex", e);
085                        throw new RuntimeException(e);
086                    }
087                } else {
088                    this.index = new VMIndex(indexManager);
089                }
090            }
091            index.setKeyMarshaller(keyMarshaller);
092        }
093    
094        /*
095         * (non-Javadoc)
096         * 
097         * @see org.apache.activemq.kaha.MapContainer#load()
098         */
099        public synchronized void load() {
100            checkClosed();
101            if (!loaded) {
102                if (!loaded) {
103                    loaded = true;
104                    try {
105                        init();
106                        index.load();
107                        long nextItem = root.getNextItem();
108                        while (nextItem != Item.POSITION_NOT_SET) {
109                            IndexItem item = indexManager.getIndex(nextItem);
110                            StoreLocation data = item.getKeyDataItem();
111                            Object key = dataManager.readItem(keyMarshaller, data);
112                            if (index.isTransient()) {
113                                index.store(key, item);
114                            }
115                            indexList.add(item);
116                            nextItem = item.getNextItem();
117                        }
118                    } catch (IOException e) {
119                        LOG.error("Failed to load container " + getId(), e);
120                        throw new RuntimeStoreException(e);
121                    }
122                }
123            }
124        }
125    
126        /*
127         * (non-Javadoc)
128         * 
129         * @see org.apache.activemq.kaha.MapContainer#unload()
130         */
131        public synchronized void unload() {
132            checkClosed();
133            if (loaded) {
134                loaded = false;
135                try {
136                    index.unload();
137                } catch (IOException e) {
138                    LOG.warn("Failed to unload the index", e);
139                }
140                indexList.clear();
141            }
142        }
143    
144        public synchronized void delete() {
145            unload();
146            try {
147                index.delete();
148            } catch (IOException e) {
149                LOG.warn("Failed to unload the index", e);
150            }
151        }
152    
153    
154        public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
155            checkClosed();
156            this.keyMarshaller = keyMarshaller;
157            if (index != null) {
158                index.setKeyMarshaller(keyMarshaller);
159            }
160        }
161    
162        public synchronized void setValueMarshaller(Marshaller valueMarshaller) {
163            checkClosed();
164            this.valueMarshaller = valueMarshaller;
165        }
166    
167        /*
168         * (non-Javadoc)
169         * 
170         * @see org.apache.activemq.kaha.MapContainer#size()
171         */
172        public synchronized int size() {
173            load();
174            return indexList.size();
175        }
176    
177        /*
178         * (non-Javadoc)
179         * 
180         * @see org.apache.activemq.kaha.MapContainer#isEmpty()
181         */
182        public synchronized boolean isEmpty() {
183            load();
184            return indexList.isEmpty();
185        }
186    
187        /*
188         * (non-Javadoc)
189         * 
190         * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
191         */
192        public synchronized boolean containsKey(Object key) {
193            load();
194            try {
195                return index.containsKey(key);
196            } catch (IOException e) {
197                LOG.error("Failed trying to find key: " + key, e);
198                throw new RuntimeException(e);
199            }
200        }
201    
202        /*
203         * (non-Javadoc)
204         * 
205         * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
206         */
207        public synchronized Object get(Object key) {
208            load();
209            Object result = null;
210            StoreEntry item = null;
211            try {
212                item = index.get(key);
213            } catch (IOException e) {
214                LOG.error("Failed trying to get key: " + key, e);
215                throw new RuntimeException(e);
216            }
217            if (item != null) {
218                result = getValue(item);
219            }
220            return result;
221        }
222    
223        /**
224         * Get the StoreEntry associated with the key
225         * 
226         * @param key
227         * @return the StoreEntry
228         */
229        public synchronized StoreEntry getEntry(Object key) {
230            load();
231            StoreEntry item = null;
232            try {
233                item = index.get(key);
234            } catch (IOException e) {
235                LOG.error("Failed trying to get key: " + key, e);
236                throw new RuntimeException(e);
237            }
238            return item;
239        }
240    
241        /*
242         * (non-Javadoc)
243         * 
244         * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
245         */
246        public synchronized boolean containsValue(Object o) {
247            load();
248            boolean result = false;
249            if (o != null) {
250                IndexItem item = indexList.getFirst();
251                while (item != null) {
252                    Object value = getValue(item);
253                    if (value != null && value.equals(o)) {
254                        result = true;
255                        break;
256                    }
257                    item = indexList.getNextEntry(item);
258                }
259            }
260            return result;
261        }
262    
263        /*
264         * (non-Javadoc)
265         * 
266         * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
267         */
268        public synchronized void putAll(Map t) {
269            load();
270            if (t != null) {
271                for (Iterator i = t.entrySet().iterator(); i.hasNext();) {
272                    Map.Entry entry = (Map.Entry)i.next();
273                    put(entry.getKey(), entry.getValue());
274                }
275            }
276        }
277    
278        /*
279         * (non-Javadoc)
280         * 
281         * @see org.apache.activemq.kaha.MapContainer#keySet()
282         */
283        public synchronized Set keySet() {
284            load();
285            return new ContainerKeySet(this);
286        }
287    
288        /*
289         * (non-Javadoc)
290         * 
291         * @see org.apache.activemq.kaha.MapContainer#values()
292         */
293        public synchronized Collection values() {
294            load();
295            return new ContainerValueCollection(this);
296        }
297    
298        /*
299         * (non-Javadoc)
300         * 
301         * @see org.apache.activemq.kaha.MapContainer#entrySet()
302         */
303        public synchronized Set entrySet() {
304            load();
305            return new ContainerEntrySet(this);
306        }
307    
308        /*
309         * (non-Javadoc)
310         * 
311         * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
312         *      java.lang.Object)
313         */
314        public synchronized Object put(Object key, Object value) {
315            load();
316            Object result = remove(key);
317            IndexItem item = write(key, value);
318            try {
319                index.store(key, item);
320            } catch (IOException e) {
321                LOG.error("Failed trying to insert key: " + key, e);
322                throw new RuntimeException(e);
323            }
324            indexList.add(item);
325            return result;
326        }
327    
328        /*
329         * (non-Javadoc)
330         * 
331         * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
332         */
333        public synchronized Object remove(Object key) {
334            load();
335            try {
336                Object result = null;
337                IndexItem item = (IndexItem)index.remove(key);
338                if (item != null) {
339                    // refresh the index
340                    item = (IndexItem)indexList.refreshEntry(item);
341                    result = getValue(item);
342                    IndexItem prev = indexList.getPrevEntry(item);
343                    IndexItem next = indexList.getNextEntry(item);
344                    indexList.remove(item);
345                    delete(item, prev, next);
346                }
347                return result;
348            } catch (IOException e) {
349                LOG.error("Failed trying to remove key: " + key, e);
350                throw new RuntimeException(e);
351            }
352        }
353    
354        public synchronized boolean removeValue(Object o) {
355            load();
356            boolean result = false;
357            if (o != null) {
358                IndexItem item = indexList.getFirst();
359                while (item != null) {
360                    Object value = getValue(item);
361                    if (value != null && value.equals(o)) {
362                        result = true;
363                        // find the key
364                        Object key = getKey(item);
365                        if (key != null) {
366                            remove(key);
367                        }
368                        break;
369                    }
370                    item = indexList.getNextEntry(item);
371                }
372            }
373            return result;
374        }
375    
376        protected synchronized void remove(IndexItem item) {
377            Object key = getKey(item);
378            if (key != null) {
379                remove(key);
380            }
381        }
382    
383        /*
384         * (non-Javadoc)
385         * 
386         * @see org.apache.activemq.kaha.MapContainer#clear()
387         */
388        public synchronized void clear() {
389            checkClosed();
390            loaded = true;
391            init();
392            if (index != null) {
393                try {
394                    index.clear();
395                } catch (IOException e) {
396                    LOG.error("Failed trying clear index", e);
397                    throw new RuntimeException(e);
398                }
399            }
400            super.clear();
401            doClear();
402        }
403    
404        /**
405         * Add an entry to the Store Map
406         * 
407         * @param key
408         * @param value
409         * @return the StoreEntry associated with the entry
410         */
411        public synchronized StoreEntry place(Object key, Object value) {
412            load();
413            try {
414                remove(key);
415                IndexItem item = write(key, value);
416                index.store(key, item);
417                indexList.add(item);
418                return item;
419            } catch (IOException e) {
420                LOG.error("Failed trying to place key: " + key, e);
421                throw new RuntimeException(e);
422            }
423        }
424    
425        /**
426         * Remove an Entry from ther Map
427         * 
428         * @param entry
429         * @throws IOException
430         */
431        public synchronized void remove(StoreEntry entry) {
432            load();
433            IndexItem item = (IndexItem)entry;
434            if (item != null) {
435                Object key = getKey(item);
436                try {
437                    index.remove(key);
438                } catch (IOException e) {
439                    LOG.error("Failed trying to remove entry: " + entry, e);
440                    throw new RuntimeException(e);
441                }
442                IndexItem prev = indexList.getPrevEntry(item);
443                IndexItem next = indexList.getNextEntry(item);
444                indexList.remove(item);
445                delete(item, prev, next);
446            }
447        }
448    
449        public synchronized StoreEntry getFirst() {
450            load();
451            return indexList.getFirst();
452        }
453    
454        public synchronized StoreEntry getLast() {
455            load();
456            return indexList.getLast();
457        }
458    
459        public synchronized StoreEntry getNext(StoreEntry entry) {
460            load();
461            IndexItem item = (IndexItem)entry;
462            return indexList.getNextEntry(item);
463        }
464    
465        public synchronized StoreEntry getPrevious(StoreEntry entry) {
466            load();
467            IndexItem item = (IndexItem)entry;
468            return indexList.getPrevEntry(item);
469        }
470    
471        public synchronized StoreEntry refresh(StoreEntry entry) {
472            load();
473            return indexList.getEntry(entry);
474        }
475    
476        /**
477         * Get the value from it's location
478         * 
479         * @param item
480         * @return the value associated with the store entry
481         */
482        public synchronized Object getValue(StoreEntry item) {
483            load();
484            Object result = null;
485            if (item != null) {
486                try {
487                    // ensure this value is up to date
488                    // item=indexList.getEntry(item);
489                    StoreLocation data = item.getValueDataItem();
490                    result = dataManager.readItem(valueMarshaller, data);
491                } catch (IOException e) {
492                    LOG.error("Failed to get value for " + item, e);
493                    throw new RuntimeStoreException(e);
494                }
495            }
496            return result;
497        }
498    
499        /**
500         * Get the Key object from it's location
501         * 
502         * @param item
503         * @return the Key Object associated with the StoreEntry
504         */
505        public synchronized Object getKey(StoreEntry item) {
506            load();
507            Object result = null;
508            if (item != null) {
509                try {
510                    StoreLocation data = item.getKeyDataItem();
511                    result = dataManager.readItem(keyMarshaller, data);
512                } catch (IOException e) {
513                    LOG.error("Failed to get key for " + item, e);
514                    throw new RuntimeStoreException(e);
515                }
516            }
517            return result;
518        }
519    
520        protected IndexLinkedList getItemList() {
521            return indexList;
522        }
523    
524        protected synchronized IndexItem write(Object key, Object value) {
525            IndexItem index = null;
526            try {
527                index = indexManager.createNewIndex();
528                StoreLocation data = dataManager.storeDataItem(keyMarshaller, key);
529                index.setKeyData(data);
530    
531                if (value != null) {
532                    data = dataManager.storeDataItem(valueMarshaller, value);
533                    index.setValueData(data);
534                }
535                IndexItem prev = indexList.getLast();
536                prev = prev != null ? prev : indexList.getRoot();
537                IndexItem next = indexList.getNextEntry(prev);
538                prev.setNextItem(index.getOffset());
539                index.setPreviousItem(prev.getOffset());
540                updateIndexes(prev);
541                if (next != null) {
542                    next.setPreviousItem(index.getOffset());
543                    index.setNextItem(next.getOffset());
544                    updateIndexes(next);
545                }
546                storeIndex(index);
547            } catch (IOException e) {
548                LOG.error("Failed to write " + key + " , " + value, e);
549                throw new RuntimeStoreException(e);
550            }
551            return index;
552        }
553    
554        public int getIndexBinSize() {
555            return indexBinSize;
556        }
557    
558        public void setIndexBinSize(int indexBinSize) {
559            this.indexBinSize = indexBinSize;
560        }
561    
562        public int getIndexKeySize() {
563            return indexKeySize;
564        }
565    
566        public void setIndexKeySize(int indexKeySize) {
567            this.indexKeySize = indexKeySize;
568        }
569    
570        public int getIndexPageSize() {
571            return indexPageSize;
572        }
573    
574        public void setIndexPageSize(int indexPageSize) {
575            this.indexPageSize = indexPageSize;
576        }
577        
578        public int getIndexLoadFactor() {
579            return indexLoadFactor;
580        }
581    
582        public void setIndexLoadFactor(int loadFactor) {
583            this.indexLoadFactor = loadFactor;
584        }
585    
586      
587        public IndexMBean getIndexMBean() {
588          return (IndexMBean) index;
589        }
590        public int getIndexMaxBinSize() {
591            return indexMaxBinSize;
592        }
593    
594        public void setIndexMaxBinSize(int maxBinSize) {
595            this.indexMaxBinSize = maxBinSize;
596        }
597       
598    
599       
600        public String toString() {
601            load();
602            StringBuffer buf = new StringBuffer();
603            buf.append("{");
604            Iterator i = entrySet().iterator();
605            boolean hasNext = i.hasNext();
606            while (hasNext) {
607                Map.Entry e = (Entry) i.next();
608                Object key = e.getKey();
609                Object value = e.getValue();
610                buf.append(key);
611                buf.append("=");
612    
613                buf.append(value);
614                hasNext = i.hasNext();
615                if (hasNext)
616                    buf.append(", ");
617            }
618            buf.append("}");
619            return buf.toString();
620        }    
621    }