001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.NoSuchElementException;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicReference;
028    
029    import org.apache.activemq.broker.region.MessageReference;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.store.PList;
032    import org.apache.activemq.store.PListEntry;
033    import org.apache.activemq.store.kahadb.disk.index.ListIndex;
034    import org.apache.activemq.store.kahadb.disk.journal.Location;
035    import org.apache.activemq.store.kahadb.disk.page.Transaction;
036    import org.apache.activemq.util.ByteSequence;
037    import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
038    import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
039    import org.apache.activemq.wireformat.WireFormat;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    public class PListImpl extends ListIndex<String, Location> implements PList {
044        static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
045        final PListStoreImpl store;
046        private String name;
047        Object indexLock;
048    
049        PListImpl(PListStoreImpl store) {
050            this.store = store;
051            this.indexLock = store.getIndexLock();
052            setPageFile(store.getPageFile());
053            setKeyMarshaller(StringMarshaller.INSTANCE);
054            setValueMarshaller(LocationMarshaller.INSTANCE);
055        }
056    
057        public void setName(String name) {
058            this.name = name;
059        }
060    
061        @Override
062        public String getName() {
063            return this.name;
064        }
065    
066        void read(DataInput in) throws IOException {
067            setHeadPageId(in.readLong());
068        }
069    
070        public void write(DataOutput out) throws IOException {
071            out.writeLong(getHeadPageId());
072        }
073    
074        @Override
075        public synchronized void destroy() throws IOException {
076            synchronized (indexLock) {
077                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
078                    public void execute(Transaction tx) throws IOException {
079                        clear(tx);
080                        unload(tx);
081                    }
082                });
083            }
084        }
085    
086        class Locator {
087            final String id;
088    
089            Locator(String id) {
090                this.id = id;
091            }
092    
093            PListImpl plist() {
094                return PListImpl.this;
095            }
096        }
097    
098        @Override
099        public Object addLast(final String id, final ByteSequence bs) throws IOException {
100            final Location location = this.store.write(bs, false);
101            synchronized (indexLock) {
102                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
103                    public void execute(Transaction tx) throws IOException {
104                        add(tx, id, location);
105                    }
106                });
107            }
108            return new Locator(id);
109        }
110    
111        @Override
112        public Object addFirst(final String id, final ByteSequence bs) throws IOException {
113            final Location location = this.store.write(bs, false);
114            synchronized (indexLock) {
115                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
116                    public void execute(Transaction tx) throws IOException {
117                        addFirst(tx, id, location);
118                    }
119                });
120            }
121            return new Locator(id);
122        }
123    
124        @Override
125        public boolean remove(final Object l) throws IOException {
126            Locator locator = (Locator) l;
127            assert locator!=null;
128            assert locator.plist()==this;
129            return remove(locator.id);
130        }
131    
132        public boolean remove(final String id) throws IOException {
133            final AtomicBoolean result = new AtomicBoolean();
134            synchronized (indexLock) {
135                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
136                    public void execute(Transaction tx) throws IOException {
137                        result.set(remove(tx, id) != null);
138                    }
139                });
140            }
141            return result.get();
142        }
143    
144        public boolean remove(final long position) throws IOException {
145            final AtomicBoolean result = new AtomicBoolean();
146            synchronized (indexLock) {
147                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
148                    public void execute(Transaction tx) throws IOException {
149                        Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
150                        if (iterator.hasNext()) {
151                            iterator.next();
152                            iterator.remove();
153                            result.set(true);
154                        } else {
155                            result.set(false);
156                        }
157                    }
158                });
159            }
160            return result.get();
161        }
162    
163        public PListEntry get(final long position) throws IOException {
164            PListEntry result = null;
165            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
166            synchronized (indexLock) {
167                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
168                    public void execute(Transaction tx) throws IOException {
169                        Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
170                        ref.set(iterator.next());
171                    }
172                });
173            }
174            if (ref.get() != null) {
175                ByteSequence bs = this.store.getPayload(ref.get().getValue());
176                result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
177            }
178            return result;
179        }
180    
181        public PListEntry getFirst() throws IOException {
182            PListEntry result = null;
183            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
184            synchronized (indexLock) {
185                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
186                    public void execute(Transaction tx) throws IOException {
187                        ref.set(getFirst(tx));
188                    }
189                });
190            }
191            if (ref.get() != null) {
192                ByteSequence bs = this.store.getPayload(ref.get().getValue());
193                result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
194            }
195            return result;
196        }
197    
198        public PListEntry getLast() throws IOException {
199            PListEntry result = null;
200            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
201            synchronized (indexLock) {
202                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
203                    public void execute(Transaction tx) throws IOException {
204                        ref.set(getLast(tx));
205                    }
206                });
207            }
208            if (ref.get() != null) {
209                ByteSequence bs = this.store.getPayload(ref.get().getValue());
210                result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
211            }
212            return result;
213        }
214    
215        @Override
216        public boolean isEmpty() {
217            return size() == 0;
218        }
219    
220        @Override
221        public PListIterator iterator() throws IOException {
222            return new PListIteratorImpl();
223        }
224    
225        final class PListIteratorImpl implements PListIterator {
226            final Iterator<Map.Entry<String, Location>> iterator;
227            final Transaction tx;
228    
229            PListIteratorImpl() throws IOException {
230                tx = store.pageFile.tx();
231                synchronized (indexLock) {
232                    this.iterator = iterator(tx);
233                }
234            }
235    
236            @Override
237            public boolean hasNext() {
238                return iterator.hasNext();
239            }
240    
241            @Override
242            public PListEntry next() {
243                Map.Entry<String, Location> entry = iterator.next();
244                ByteSequence bs = null;
245                try {
246                    bs = store.getPayload(entry.getValue());
247                } catch (IOException unexpected) {
248                    NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
249                    e.initCause(unexpected);
250                    throw e;
251                }
252                return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey()));
253            }
254    
255            @Override
256            public void remove() {
257                try {
258                    synchronized (indexLock) {
259                        tx.execute(new Transaction.Closure<IOException>() {
260                            @Override
261                            public void execute(Transaction tx) throws IOException {
262                                iterator.remove();
263                            }
264                        });
265                    }
266                } catch (IOException unexpected) {
267                    IllegalStateException e = new IllegalStateException(unexpected);
268                    e.initCause(unexpected);
269                    throw e;
270                }
271            }
272    
273            public void release() {
274                try {
275                    tx.rollback();
276                } catch (IOException unexpected) {
277                    IllegalStateException e = new IllegalStateException(unexpected);
278                    e.initCause(unexpected);
279                    throw e;
280                }
281            }
282        }
283    
284        public void claimFileLocations(final Set<Integer> candidates) throws IOException {
285            synchronized (indexLock) {
286                if (loaded.get()) {
287                    this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
288                        public void execute(Transaction tx) throws IOException {
289                            Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
290                            while (iterator.hasNext()) {
291                                Location location = iterator.next().getValue();
292                                candidates.remove(location.getDataFileId());
293                            }
294                        }
295                    });
296                }
297            }
298        }
299    
300        @Override
301        public String toString() {
302            return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
303        }
304    }