001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.plist;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.NoSuchElementException;
025 import java.util.Set;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicReference;
028
029 import org.apache.kahadb.index.ListIndex;
030 import org.apache.kahadb.journal.Location;
031 import org.apache.kahadb.page.Transaction;
032 import org.apache.kahadb.util.ByteSequence;
033 import org.apache.kahadb.util.LocationMarshaller;
034 import org.apache.kahadb.util.StringMarshaller;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 public class PList extends ListIndex<String, Location> {
039 static final Logger LOG = LoggerFactory.getLogger(PList.class);
040 final PListStore store;
041 private String name;
042 Object indexLock;
043
044 PList(PListStore store) {
045 this.store = store;
046 this.indexLock = store.getIndexLock();
047 setPageFile(store.getPageFile());
048 setKeyMarshaller(StringMarshaller.INSTANCE);
049 setValueMarshaller(LocationMarshaller.INSTANCE);
050 }
051
052 public void setName(String name) {
053 this.name = name;
054 }
055
056 public String getName() {
057 return this.name;
058 }
059
060 void read(DataInput in) throws IOException {
061 setHeadPageId(in.readLong());
062 }
063
064 public void write(DataOutput out) throws IOException {
065 out.writeLong(getHeadPageId());
066 }
067
068 public synchronized void destroy() throws IOException {
069 synchronized (indexLock) {
070 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
071 public void execute(Transaction tx) throws IOException {
072 clear(tx);
073 unload(tx);
074 }
075 });
076 }
077 }
078
079 public void addLast(final String id, final ByteSequence bs) throws IOException {
080 final Location location = this.store.write(bs, false);
081 synchronized (indexLock) {
082 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
083 public void execute(Transaction tx) throws IOException {
084 add(tx, id, location);
085 }
086 });
087 }
088 }
089
090 public void addFirst(final String id, final ByteSequence bs) throws IOException {
091 final Location location = this.store.write(bs, false);
092 synchronized (indexLock) {
093 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
094 public void execute(Transaction tx) throws IOException {
095 addFirst(tx, id, location);
096 }
097 });
098 }
099 }
100
101 public boolean remove(final String id) throws IOException {
102 final AtomicBoolean result = new AtomicBoolean();
103 synchronized (indexLock) {
104 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
105 public void execute(Transaction tx) throws IOException {
106 result.set(remove(tx, id) != null);
107 }
108 });
109 }
110 return result.get();
111 }
112
113 public boolean remove(final long position) throws IOException {
114 final AtomicBoolean result = new AtomicBoolean();
115 synchronized (indexLock) {
116 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
117 public void execute(Transaction tx) throws IOException {
118 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
119 if (iterator.hasNext()) {
120 iterator.next();
121 iterator.remove();
122 result.set(true);
123 } else {
124 result.set(false);
125 }
126 }
127 });
128 }
129 return result.get();
130 }
131
132 public PListEntry get(final long position) throws IOException {
133 PListEntry result = null;
134 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
135 synchronized (indexLock) {
136 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
137 public void execute(Transaction tx) throws IOException {
138 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
139 ref.set(iterator.next());
140 }
141 });
142 }
143 if (ref.get() != null) {
144 ByteSequence bs = this.store.getPayload(ref.get().getValue());
145 result = new PListEntry(ref.get().getKey(), bs);
146 }
147 return result;
148 }
149
150 public PListEntry getFirst() throws IOException {
151 PListEntry result = null;
152 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
153 synchronized (indexLock) {
154 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
155 public void execute(Transaction tx) throws IOException {
156 ref.set(getFirst(tx));
157 }
158 });
159 }
160 if (ref.get() != null) {
161 ByteSequence bs = this.store.getPayload(ref.get().getValue());
162 result = new PListEntry(ref.get().getKey(), bs);
163 }
164 return result;
165 }
166
167 public PListEntry getLast() throws IOException {
168 PListEntry result = null;
169 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
170 synchronized (indexLock) {
171 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
172 public void execute(Transaction tx) throws IOException {
173 ref.set(getLast(tx));
174 }
175 });
176 }
177 if (ref.get() != null) {
178 ByteSequence bs = this.store.getPayload(ref.get().getValue());
179 result = new PListEntry(ref.get().getKey(), bs);
180 }
181 return result;
182 }
183
184 public boolean isEmpty() {
185 return size() == 0;
186 }
187
188 public PListIterator iterator() throws IOException {
189 return new PListIterator();
190 }
191
192 public final class PListIterator implements Iterator<PListEntry> {
193 final Iterator<Map.Entry<String, Location>> iterator;
194 final Transaction tx;
195
196 PListIterator() throws IOException {
197 tx = store.pageFile.tx();
198 synchronized (indexLock) {
199 this.iterator = iterator(tx);
200 }
201 }
202
203 @Override
204 public boolean hasNext() {
205 return iterator.hasNext();
206 }
207
208 @Override
209 public PListEntry next() {
210 Map.Entry<String, Location> entry = iterator.next();
211 ByteSequence bs = null;
212 try {
213 bs = store.getPayload(entry.getValue());
214 } catch (IOException unexpected) {
215 NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
216 e.initCause(unexpected);
217 throw e;
218 }
219 return new PListEntry(entry.getKey(), bs);
220 }
221
222 @Override
223 public void remove() {
224 try {
225 synchronized (indexLock) {
226 tx.execute(new Transaction.Closure<IOException>() {
227 @Override
228 public void execute(Transaction tx) throws IOException {
229 iterator.remove();
230 }
231 });
232 }
233 } catch (IOException unexpected) {
234 IllegalStateException e = new IllegalStateException(unexpected);
235 e.initCause(unexpected);
236 throw e;
237 }
238 }
239
240 public void release() {
241 try {
242 tx.rollback();
243 } catch (IOException unexpected) {
244 IllegalStateException e = new IllegalStateException(unexpected);
245 e.initCause(unexpected);
246 throw e;
247 }
248 }
249 }
250
251 public void claimFileLocations(final Set<Integer> candidates) throws IOException {
252 synchronized (indexLock) {
253 if (loaded.get()) {
254 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
255 public void execute(Transaction tx) throws IOException {
256 Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
257 while (iterator.hasNext()) {
258 Location location = iterator.next().getValue();
259 candidates.remove(location.getDataFileId());
260 }
261 }
262 });
263 }
264 }
265 }
266
267 @Override
268 public String toString() {
269 return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
270 }
271 }