001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.index;
018
019import java.io.IOException;
020import java.lang.ref.WeakReference;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.activemq.store.kahadb.disk.index.ListNode.ListIterator;
028import org.apache.activemq.store.kahadb.disk.page.Page;
029import org.apache.activemq.store.kahadb.disk.page.PageFile;
030import org.apache.activemq.store.kahadb.disk.page.Transaction;
031import org.apache.activemq.store.kahadb.disk.util.Marshaller;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035public class ListIndex<Key,Value> implements Index<Key,Value> {
036
037    private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class);
038    public  final static long NOT_SET = -1;
039    protected PageFile pageFile;
040    protected long headPageId;
041    protected long tailPageId;
042    private AtomicLong size = new AtomicLong(0);
043
044    protected AtomicBoolean loaded = new AtomicBoolean();
045
046    private ListNode.NodeMarshaller<Key, Value> marshaller;
047    private Marshaller<Key> keyMarshaller;
048    private Marshaller<Value> valueMarshaller;
049
050    public ListIndex() {
051    }
052
053    public ListIndex(PageFile pageFile, long headPageId) {
054        this.pageFile = pageFile;
055        setHeadPageId(headPageId);
056    }
057
058    @SuppressWarnings("rawtypes")
059    public ListIndex(PageFile pageFile, Page page) {
060        this(pageFile, page.getPageId());
061    }
062
063    @Override
064    synchronized public void load(Transaction tx) throws IOException {
065        if (loaded.compareAndSet(false, true)) {
066            LOG.debug("loading");
067            if( keyMarshaller == null ) {
068                throw new IllegalArgumentException("The key marshaller must be set before loading the ListIndex");
069            }
070            if( valueMarshaller == null ) {
071                throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex");
072            }
073
074            marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller);
075            final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null);
076            if( p.getType() == Page.PAGE_FREE_TYPE ) {
077                 // Need to initialize it..
078                ListNode<Key, Value> root = createNode(p);
079                storeNode(tx, root, true);
080                setHeadPageId(p.getPageId());
081                setTailPageId(getHeadPageId());
082            } else {
083                ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
084                setTailPageId(getHeadPageId());
085                size.addAndGet(node.size(tx));
086                onLoad(node, tx);
087                while (node.getNext() != NOT_SET ) {
088                    node = loadNode(tx, node.getNext());
089                    size.addAndGet(node.size(tx));
090                    onLoad(node, tx);
091                    setTailPageId(node.getPageId());
092                }
093            }
094        }
095    }
096
097    protected void onLoad(ListNode<Key, Value> node, Transaction tx) {
098
099    }
100
101    @Override
102    synchronized public void unload(Transaction tx) {
103        if (loaded.compareAndSet(true, false)) {
104        }
105    }
106
107    protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
108        return loadNode(tx, getHeadPageId());
109    }
110
111    protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
112        return loadNode(tx, getTailPageId());
113    }
114
115    @Override
116    synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
117        assertLoaded();
118
119        if (size.get() == 0) {
120            return false;
121        }
122
123        for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
124            Map.Entry<Key,Value> candidate = iterator.next();
125            if (key.equals(candidate.getKey())) {
126                return true;
127            }
128        }
129        return false;
130    }
131
132    private ListNode<Key, Value> lastGetNodeCache = null;
133    private Map.Entry<Key, Value> lastGetEntryCache = null;
134    private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
135
136    @Override
137    @SuppressWarnings({ "rawtypes", "unchecked" })
138    synchronized public Value get(Transaction tx, Key key) throws IOException {
139        assertLoaded();
140        for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
141            Map.Entry<Key,Value> candidate = iterator.next();
142            if (key.equals(candidate.getKey())) {
143                this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
144                this.lastGetEntryCache = candidate;
145                this.lastCacheTxSrc = new WeakReference<Transaction>(tx);
146                return candidate.getValue();
147            }
148        }
149        return null;
150    }
151
152    /**
153     * Update the value of the item with the given key in the list if ot exists, otherwise
154     * it appends the value to the end of the list.
155     *
156     * @return the old value contained in the list if one exists or null.
157     */
158    @Override
159    @SuppressWarnings({ "rawtypes" })
160    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
161
162        Value oldValue = null;
163
164        if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
165
166            if(lastGetEntryCache.getKey().equals(key)) {
167                oldValue = lastGetEntryCache.setValue(value);
168                lastGetEntryCache.setValue(value);
169                lastGetNodeCache.storeUpdate(tx);
170                flushCache();
171                return oldValue;
172            }
173
174            // This searches from the last location of a call to get for the element to replace
175            // all the way to the end of the ListIndex.
176            Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
177            while (iterator.hasNext()) {
178                Map.Entry<Key, Value> entry = iterator.next();
179                if (entry.getKey().equals(key)) {
180                    oldValue = entry.setValue(value);
181                    ((ListIterator) iterator).getCurrent().storeUpdate(tx);
182                    flushCache();
183                    return oldValue;
184                }
185            }
186        } else {
187            flushCache();
188        }
189
190        // Not found because the cache wasn't set or its not at the end of the list so we
191        // start from the beginning and go to the cached location or the end, then we do
192        // an add if its not found.
193        Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
194        while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
195            Map.Entry<Key, Value> entry = iterator.next();
196            if (entry.getKey().equals(key)) {
197                oldValue = entry.setValue(value);
198                ((ListIterator) iterator).getCurrent().storeUpdate(tx);
199                flushCache();
200                return oldValue;
201            }
202        }
203
204        // Not found so add it last.
205        flushCache();
206
207        return add(tx, key, value);
208    }
209
210    synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
211        assertLoaded();
212        getTail(tx).put(tx, key, value);
213        size.incrementAndGet();
214        flushCache();
215        return null;
216    }
217
218    synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
219        assertLoaded();
220        getHead(tx).addFirst(tx, key, value);
221        size.incrementAndGet();
222        flushCache();
223        return null;
224    }
225
226    @Override
227    @SuppressWarnings("rawtypes")
228    synchronized public Value remove(Transaction tx, Key key) throws IOException {
229        assertLoaded();
230
231        if (size.get() == 0) {
232            return null;
233        }
234
235        if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
236
237            // This searches from the last location of a call to get for the element to remove
238            // all the way to the end of the ListIndex.
239            Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
240            while (iterator.hasNext()) {
241                Map.Entry<Key, Value> entry = iterator.next();
242                if (entry.getKey().equals(key)) {
243                    iterator.remove();
244                    flushCache();
245                    return entry.getValue();
246                }
247            }
248        } else {
249            flushCache();
250        }
251
252        // Not found because the cache wasn't set or its not at the end of the list so we
253        // start from the beginning and go to the cached location or the end to find the
254        // element to remove.
255        Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
256        while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
257            Map.Entry<Key, Value> entry = iterator.next();
258            if (entry.getKey().equals(key)) {
259                iterator.remove();
260                flushCache();
261                return entry.getValue();
262            }
263        }
264
265        return null;
266    }
267
268    public void onRemove(Entry<Key, Value> removed) {
269        size.decrementAndGet();
270        flushCache();
271    }
272
273    @Override
274    public boolean isTransient() {
275        return false;
276    }
277
278    @Override
279    synchronized public void clear(Transaction tx) throws IOException {
280        for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
281            ListNode<Key,Value>candidate = iterator.next();
282            candidate.clear(tx);
283            // break up the transaction
284            tx.commit();
285        }
286        flushCache();
287        size.set(0);
288    }
289
290    synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
291        return getHead(tx).listNodeIterator(tx);
292    }
293
294    synchronized public boolean isEmpty(final Transaction tx) throws IOException {
295        return getHead(tx).isEmpty(tx);
296    }
297
298    @Override
299    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
300        return getHead(tx).iterator(tx);
301    }
302
303    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
304        return getHead(tx).iterator(tx, initialPosition);
305    }
306
307    synchronized public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException {
308        return getHead(tx).getFirst(tx);
309    }
310
311    synchronized public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
312        return getTail(tx).getLast(tx);
313    }
314
315    private void assertLoaded() throws IllegalStateException {
316        if( !loaded.get() ) {
317            throw new IllegalStateException("TheListIndex is not loaded");
318        }
319    }
320
321    ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
322        Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
323        ListNode<Key, Value> node = page.get();
324        node.setPage(page);
325        node.setContainingList(this);
326        return node;
327    }
328
329    ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
330        ListNode<Key,Value> node = new ListNode<Key,Value>();
331        node.setPage(page);
332        page.set(node);
333        node.setContainingList(this);
334        return node;
335    }
336
337    public ListNode<Key,Value> createNode(Transaction tx) throws IOException {
338        return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null));
339    }
340
341    public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
342        tx.store(node.getPage(), marshaller, overflow);
343        flushCache();
344    }
345
346    public PageFile getPageFile() {
347        return pageFile;
348    }
349
350    public void setPageFile(PageFile pageFile) {
351        this.pageFile = pageFile;
352    }
353
354    public long getHeadPageId() {
355        return headPageId;
356    }
357
358    public void setHeadPageId(long headPageId) {
359        this.headPageId = headPageId;
360    }
361
362    public Marshaller<Key> getKeyMarshaller() {
363        return keyMarshaller;
364    }
365    @Override
366    public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
367        this.keyMarshaller = keyMarshaller;
368    }
369
370    public Marshaller<Value> getValueMarshaller() {
371        return valueMarshaller;
372    }
373    @Override
374    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
375        this.valueMarshaller = valueMarshaller;
376    }
377
378    public void setTailPageId(long tailPageId) {
379        this.tailPageId = tailPageId;
380    }
381
382    public long getTailPageId() {
383       return tailPageId;
384    }
385
386    public long size() {
387        return size.get();
388    }
389
390    private void flushCache() {
391        this.lastGetEntryCache = null;
392        this.lastGetNodeCache = null;
393        this.lastCacheTxSrc.clear();
394    }
395}