001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.NoSuchElementException;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicReference;
028    
029    import org.apache.kahadb.index.ListIndex;
030    import org.apache.kahadb.journal.Location;
031    import org.apache.kahadb.page.Transaction;
032    import org.apache.kahadb.util.ByteSequence;
033    import org.apache.kahadb.util.LocationMarshaller;
034    import org.apache.kahadb.util.StringMarshaller;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    public class PList extends ListIndex<String, Location> {
039        static final Logger LOG = LoggerFactory.getLogger(PList.class);
040        final PListStore store;
041        private String name;
042        Object indexLock;
043    
044        PList(PListStore store) {
045            this.store = store;
046            this.indexLock = store.getIndexLock();
047            setPageFile(store.getPageFile());
048            setKeyMarshaller(StringMarshaller.INSTANCE);
049            setValueMarshaller(LocationMarshaller.INSTANCE);
050        }
051    
052        public void setName(String name) {
053            this.name = name;
054        }
055    
056        public String getName() {
057            return this.name;
058        }
059    
060        void read(DataInput in) throws IOException {
061            setHeadPageId(in.readLong());
062        }
063    
064        public void write(DataOutput out) throws IOException {
065            out.writeLong(getHeadPageId());
066        }
067    
068        public synchronized void destroy() throws IOException {
069            synchronized (indexLock) {
070                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
071                    public void execute(Transaction tx) throws IOException {
072                        clear(tx);
073                        unload(tx);
074                    }
075                });
076            }
077        }
078    
079        public void addLast(final String id, final ByteSequence bs) throws IOException {
080            final Location location = this.store.write(bs, false);
081            synchronized (indexLock) {
082                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
083                    public void execute(Transaction tx) throws IOException {
084                        add(tx, id, location);
085                    }
086                });
087            }
088        }
089    
090        public void addFirst(final String id, final ByteSequence bs) throws IOException {
091            final Location location = this.store.write(bs, false);
092            synchronized (indexLock) {
093                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
094                    public void execute(Transaction tx) throws IOException {
095                        addFirst(tx, id, location);
096                    }
097                });
098            }
099        }
100    
101        public boolean remove(final String id) throws IOException {
102            final AtomicBoolean result = new AtomicBoolean();
103            synchronized (indexLock) {
104                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
105                    public void execute(Transaction tx) throws IOException {
106                        result.set(remove(tx, id) != null);
107                    }
108                });
109            }
110            return result.get();
111        }
112    
113        public boolean remove(final long position) throws IOException {
114            final AtomicBoolean result = new AtomicBoolean();
115            synchronized (indexLock) {
116                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
117                    public void execute(Transaction tx) throws IOException {
118                        Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
119                        if (iterator.hasNext()) {
120                            iterator.next();
121                            iterator.remove();
122                            result.set(true);
123                        } else {
124                            result.set(false);
125                        }
126                    }
127                });
128            }
129            return result.get();
130        }
131    
132        public PListEntry get(final long position) throws IOException {
133            PListEntry result = null;
134            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
135            synchronized (indexLock) {
136                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
137                    public void execute(Transaction tx) throws IOException {
138                        Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
139                        ref.set(iterator.next());
140                    }
141                });
142            }
143            if (ref.get() != null) {
144                ByteSequence bs = this.store.getPayload(ref.get().getValue());
145                result = new PListEntry(ref.get().getKey(), bs);
146            }
147            return result;
148        }
149    
150        public PListEntry getFirst() throws IOException {
151            PListEntry result = null;
152            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
153            synchronized (indexLock) {
154                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
155                    public void execute(Transaction tx) throws IOException {
156                        ref.set(getFirst(tx));
157                    }
158                });
159            }
160            if (ref.get() != null) {
161                ByteSequence bs = this.store.getPayload(ref.get().getValue());
162                result = new PListEntry(ref.get().getKey(), bs);
163            }
164            return result;
165        }
166    
167        public PListEntry getLast() throws IOException {
168            PListEntry result = null;
169            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
170            synchronized (indexLock) {
171                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
172                    public void execute(Transaction tx) throws IOException {
173                        ref.set(getLast(tx));
174                    }
175                });
176            }
177            if (ref.get() != null) {
178                ByteSequence bs = this.store.getPayload(ref.get().getValue());
179                result = new PListEntry(ref.get().getKey(), bs);
180            }
181            return result;
182        }
183    
184        public boolean isEmpty() {
185            return size() == 0;
186        }
187    
188        public PListIterator iterator() throws IOException {
189            return new PListIterator();
190        }
191    
192        public final class PListIterator implements Iterator<PListEntry> {
193            final Iterator<Map.Entry<String, Location>> iterator;
194            final Transaction tx;
195    
196            PListIterator() throws IOException {
197                tx = store.pageFile.tx();
198                synchronized (indexLock) {
199                    this.iterator = iterator(tx);
200                }
201            }
202    
203            @Override
204            public boolean hasNext() {
205                return iterator.hasNext();
206            }
207    
208            @Override
209            public PListEntry next() {
210                Map.Entry<String, Location> entry = iterator.next();
211                ByteSequence bs = null;
212                try {
213                    bs = store.getPayload(entry.getValue());
214                } catch (IOException unexpected) {
215                    NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
216                    e.initCause(unexpected);
217                    throw e;
218                }
219                return new PListEntry(entry.getKey(), bs);
220            }
221    
222            @Override
223            public void remove() {
224                try {
225                    synchronized (indexLock) {
226                        tx.execute(new Transaction.Closure<IOException>() {
227                            @Override
228                            public void execute(Transaction tx) throws IOException {
229                                iterator.remove();
230                            }
231                        });
232                    }
233                } catch (IOException unexpected) {
234                    IllegalStateException e = new IllegalStateException(unexpected);
235                    e.initCause(unexpected);
236                    throw e;
237                }
238            }
239    
240            public void release() {
241                try {
242                    tx.rollback();
243                } catch (IOException unexpected) {
244                    IllegalStateException e = new IllegalStateException(unexpected);
245                    e.initCause(unexpected);
246                    throw e;
247                }
248            }
249        }
250    
251        public void claimFileLocations(final Set<Integer> candidates) throws IOException {
252            synchronized (indexLock) {
253                if (loaded.get()) {
254                    this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
255                        public void execute(Transaction tx) throws IOException {
256                            Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
257                            while (iterator.hasNext()) {
258                                Location location = iterator.next().getValue();
259                                candidates.remove(location.getDataFileId());
260                            }
261                        }
262                    });
263                }
264            }
265        }
266    
267        @Override
268        public String toString() {
269            return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
270        }
271    }