001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.index;
018    
019    import java.io.IOException;
020    import java.lang.ref.WeakReference;
021    import java.util.Iterator;
022    import java.util.Map;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import java.util.concurrent.atomic.AtomicLong;
025    
026    import org.apache.activemq.store.kahadb.disk.index.ListNode.ListIterator;
027    import org.apache.activemq.store.kahadb.disk.page.Page;
028    import org.apache.activemq.store.kahadb.disk.page.PageFile;
029    import org.apache.activemq.store.kahadb.disk.page.Transaction;
030    import org.apache.activemq.store.kahadb.disk.util.Marshaller;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    public class ListIndex<Key,Value> implements Index<Key,Value> {
035    
036        private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class);
037        public  final static long NOT_SET = -1;
038        protected PageFile pageFile;
039        protected long headPageId;
040        protected long tailPageId;
041        private AtomicLong size = new AtomicLong(0);
042    
043        protected AtomicBoolean loaded = new AtomicBoolean();
044    
045        private ListNode.NodeMarshaller<Key, Value> marshaller;
046        private Marshaller<Key> keyMarshaller;
047        private Marshaller<Value> valueMarshaller;
048    
049        public ListIndex() {
050        }
051    
052        public ListIndex(PageFile pageFile, long headPageId) {
053            this.pageFile = pageFile;
054            setHeadPageId(headPageId);
055        }
056    
057        @SuppressWarnings("rawtypes")
058        public ListIndex(PageFile pageFile, Page page) {
059            this(pageFile, page.getPageId());
060        }
061    
062        synchronized public void load(Transaction tx) throws IOException {
063            if (loaded.compareAndSet(false, true)) {
064                LOG.debug("loading");
065                if( keyMarshaller == null ) {
066                    throw new IllegalArgumentException("The key marshaller must be set before loading the ListIndex");
067                }
068                if( valueMarshaller == null ) {
069                    throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex");
070                }
071    
072                marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller);
073                final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null);
074                if( p.getType() == Page.PAGE_FREE_TYPE ) {
075                     // Need to initialize it..
076                    ListNode<Key, Value> root = createNode(p);
077                    storeNode(tx, root, true);
078                    setHeadPageId(p.getPageId());
079                    setTailPageId(getHeadPageId());
080                } else {
081                    ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
082                    setTailPageId(getHeadPageId());
083                    size.addAndGet(node.size(tx));
084                    while (node.getNext() != NOT_SET ) {
085                        node = loadNode(tx, node.getNext());
086                        size.addAndGet(node.size(tx));
087                        setTailPageId(node.getPageId());
088                    }
089                }
090            }
091        }
092    
093        synchronized public void unload(Transaction tx) {
094            if (loaded.compareAndSet(true, false)) {
095            }
096        }
097    
098        protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
099            return loadNode(tx, getHeadPageId());
100        }
101    
102        protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
103            return loadNode(tx, getTailPageId());
104        }
105    
106        synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
107            assertLoaded();
108    
109            if (size.get() == 0) {
110                return false;
111            }
112    
113            for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
114                Map.Entry<Key,Value> candidate = iterator.next();
115                if (key.equals(candidate.getKey())) {
116                    return true;
117                }
118            }
119            return false;
120        }
121    
122        private ListNode<Key, Value> lastGetNodeCache = null;
123        private Map.Entry<Key, Value> lastGetEntryCache = null;
124        private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
125    
126        @SuppressWarnings({ "rawtypes", "unchecked" })
127        synchronized public Value get(Transaction tx, Key key) throws IOException {
128            assertLoaded();
129            for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
130                Map.Entry<Key,Value> candidate = iterator.next();
131                if (key.equals(candidate.getKey())) {
132                    this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
133                    this.lastGetEntryCache = candidate;
134                    this.lastCacheTxSrc = new WeakReference<Transaction>(tx);
135                    return candidate.getValue();
136                }
137            }
138            return null;
139        }
140    
141        /**
142         * Update the value of the item with the given key in the list if ot exists, otherwise
143         * it appends the value to the end of the list.
144         *
145         * @return the old value contained in the list if one exists or null.
146         */
147        @SuppressWarnings({ "rawtypes" })
148        synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
149    
150            Value oldValue = null;
151    
152            if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
153    
154                if(lastGetEntryCache.getKey().equals(key)) {
155                    oldValue = lastGetEntryCache.setValue(value);
156                    lastGetEntryCache.setValue(value);
157                    lastGetNodeCache.storeUpdate(tx);
158                    flushCache();
159                    return oldValue;
160                }
161    
162                // This searches from the last location of a call to get for the element to replace
163                // all the way to the end of the ListIndex.
164                Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
165                while (iterator.hasNext()) {
166                    Map.Entry<Key, Value> entry = iterator.next();
167                    if (entry.getKey().equals(key)) {
168                        oldValue = entry.setValue(value);
169                        ((ListIterator) iterator).getCurrent().storeUpdate(tx);
170                        flushCache();
171                        return oldValue;
172                    }
173                }
174            } else {
175                flushCache();
176            }
177    
178            // Not found because the cache wasn't set or its not at the end of the list so we
179            // start from the beginning and go to the cached location or the end, then we do
180            // an add if its not found.
181            Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
182            while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
183                Map.Entry<Key, Value> entry = iterator.next();
184                if (entry.getKey().equals(key)) {
185                    oldValue = entry.setValue(value);
186                    ((ListIterator) iterator).getCurrent().storeUpdate(tx);
187                    flushCache();
188                    return oldValue;
189                }
190            }
191    
192            // Not found so add it last.
193            flushCache();
194    
195            return add(tx, key, value);
196        }
197    
198        synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
199            assertLoaded();
200            getTail(tx).put(tx, key, value);
201            size.incrementAndGet();
202            flushCache();
203            return null;
204        }
205    
206        synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
207            assertLoaded();
208            getHead(tx).addFirst(tx, key, value);
209            size.incrementAndGet();
210            flushCache();
211            return null;
212        }
213    
214        @SuppressWarnings("rawtypes")
215        synchronized public Value remove(Transaction tx, Key key) throws IOException {
216            assertLoaded();
217    
218            if (size.get() == 0) {
219                return null;
220            }
221    
222            if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
223    
224                // This searches from the last location of a call to get for the element to remove
225                // all the way to the end of the ListIndex.
226                Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
227                while (iterator.hasNext()) {
228                    Map.Entry<Key, Value> entry = iterator.next();
229                    if (entry.getKey().equals(key)) {
230                        iterator.remove();
231                        flushCache();
232                        return entry.getValue();
233                    }
234                }
235            } else {
236                flushCache();
237            }
238    
239            // Not found because the cache wasn't set or its not at the end of the list so we
240            // start from the beginning and go to the cached location or the end to find the
241            // element to remove.
242            Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
243            while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
244                Map.Entry<Key, Value> entry = iterator.next();
245                if (entry.getKey().equals(key)) {
246                    iterator.remove();
247                    flushCache();
248                    return entry.getValue();
249                }
250            }
251    
252            return null;
253        }
254    
255        public void onRemove() {
256            size.decrementAndGet();
257            flushCache();
258        }
259    
260        public boolean isTransient() {
261            return false;
262        }
263    
264        synchronized public void clear(Transaction tx) throws IOException {
265            for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
266                ListNode<Key,Value>candidate = iterator.next();
267                candidate.clear(tx);
268                // break up the transaction
269                tx.commit();
270            }
271            flushCache();
272            size.set(0);
273        }
274    
275        synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
276            return getHead(tx).listNodeIterator(tx);
277        }
278    
279        synchronized public boolean isEmpty(final Transaction tx) throws IOException {
280            return getHead(tx).isEmpty(tx);
281        }
282    
283        synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
284            return getHead(tx).iterator(tx);
285        }
286    
287        synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
288            return getHead(tx).iterator(tx, initialPosition);
289        }
290    
291        synchronized public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException {
292            return getHead(tx).getFirst(tx);
293        }
294    
295        synchronized public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
296            return getTail(tx).getLast(tx);
297        }
298    
299        private void assertLoaded() throws IllegalStateException {
300            if( !loaded.get() ) {
301                throw new IllegalStateException("TheListIndex is not loaded");
302            }
303        }
304    
305        ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
306            Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
307            ListNode<Key, Value> node = page.get();
308            node.setPage(page);
309            node.setContainingList(this);
310            return node;
311        }
312    
313        ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
314            ListNode<Key,Value> node = new ListNode<Key,Value>();
315            node.setPage(page);
316            page.set(node);
317            node.setContainingList(this);
318            return node;
319        }
320    
321        public ListNode<Key,Value> createNode(Transaction tx) throws IOException {
322            return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null));
323        }
324    
325        public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
326            tx.store(node.getPage(), marshaller, overflow);
327            flushCache();
328        }
329    
330        public PageFile getPageFile() {
331            return pageFile;
332        }
333    
334        public void setPageFile(PageFile pageFile) {
335            this.pageFile = pageFile;
336        }
337    
338        public long getHeadPageId() {
339            return headPageId;
340        }
341    
342        public void setHeadPageId(long headPageId) {
343            this.headPageId = headPageId;
344        }
345    
346        public Marshaller<Key> getKeyMarshaller() {
347            return keyMarshaller;
348        }
349        public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
350            this.keyMarshaller = keyMarshaller;
351        }
352    
353        public Marshaller<Value> getValueMarshaller() {
354            return valueMarshaller;
355        }
356        public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
357            this.valueMarshaller = valueMarshaller;
358        }
359    
360        public void setTailPageId(long tailPageId) {
361            this.tailPageId = tailPageId;
362        }
363    
364        public long getTailPageId() {
365           return tailPageId;
366        }
367    
368        public long size() {
369            return size.get();
370        }
371    
372        private void flushCache() {
373            this.lastGetEntryCache = null;
374            this.lastGetNodeCache = null;
375            this.lastCacheTxSrc.clear();
376        }
377    }