001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.plist;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.NoSuchElementException;
025 import java.util.Set;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicReference;
028
029 import org.apache.activemq.broker.region.MessageReference;
030 import org.apache.activemq.command.Message;
031 import org.apache.activemq.store.PList;
032 import org.apache.activemq.store.PListEntry;
033 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
034 import org.apache.activemq.store.kahadb.disk.journal.Location;
035 import org.apache.activemq.store.kahadb.disk.page.Transaction;
036 import org.apache.activemq.util.ByteSequence;
037 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
038 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
039 import org.apache.activemq.wireformat.WireFormat;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 public class PListImpl extends ListIndex<String, Location> implements PList {
044 static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
045 final PListStoreImpl store;
046 private String name;
047 Object indexLock;
048
049 PListImpl(PListStoreImpl store) {
050 this.store = store;
051 this.indexLock = store.getIndexLock();
052 setPageFile(store.getPageFile());
053 setKeyMarshaller(StringMarshaller.INSTANCE);
054 setValueMarshaller(LocationMarshaller.INSTANCE);
055 }
056
057 public void setName(String name) {
058 this.name = name;
059 }
060
061 @Override
062 public String getName() {
063 return this.name;
064 }
065
066 void read(DataInput in) throws IOException {
067 setHeadPageId(in.readLong());
068 }
069
070 public void write(DataOutput out) throws IOException {
071 out.writeLong(getHeadPageId());
072 }
073
074 @Override
075 public synchronized void destroy() throws IOException {
076 synchronized (indexLock) {
077 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
078 public void execute(Transaction tx) throws IOException {
079 clear(tx);
080 unload(tx);
081 }
082 });
083 }
084 }
085
086 class Locator {
087 final String id;
088
089 Locator(String id) {
090 this.id = id;
091 }
092
093 PListImpl plist() {
094 return PListImpl.this;
095 }
096 }
097
098 @Override
099 public Object addLast(final String id, final ByteSequence bs) throws IOException {
100 final Location location = this.store.write(bs, false);
101 synchronized (indexLock) {
102 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
103 public void execute(Transaction tx) throws IOException {
104 add(tx, id, location);
105 }
106 });
107 }
108 return new Locator(id);
109 }
110
111 @Override
112 public Object addFirst(final String id, final ByteSequence bs) throws IOException {
113 final Location location = this.store.write(bs, false);
114 synchronized (indexLock) {
115 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
116 public void execute(Transaction tx) throws IOException {
117 addFirst(tx, id, location);
118 }
119 });
120 }
121 return new Locator(id);
122 }
123
124 @Override
125 public boolean remove(final Object l) throws IOException {
126 Locator locator = (Locator) l;
127 assert locator!=null;
128 assert locator.plist()==this;
129 return remove(locator.id);
130 }
131
132 public boolean remove(final String id) throws IOException {
133 final AtomicBoolean result = new AtomicBoolean();
134 synchronized (indexLock) {
135 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
136 public void execute(Transaction tx) throws IOException {
137 result.set(remove(tx, id) != null);
138 }
139 });
140 }
141 return result.get();
142 }
143
144 public boolean remove(final long position) throws IOException {
145 final AtomicBoolean result = new AtomicBoolean();
146 synchronized (indexLock) {
147 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
148 public void execute(Transaction tx) throws IOException {
149 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
150 if (iterator.hasNext()) {
151 iterator.next();
152 iterator.remove();
153 result.set(true);
154 } else {
155 result.set(false);
156 }
157 }
158 });
159 }
160 return result.get();
161 }
162
163 public PListEntry get(final long position) throws IOException {
164 PListEntry result = null;
165 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
166 synchronized (indexLock) {
167 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
168 public void execute(Transaction tx) throws IOException {
169 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
170 ref.set(iterator.next());
171 }
172 });
173 }
174 if (ref.get() != null) {
175 ByteSequence bs = this.store.getPayload(ref.get().getValue());
176 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
177 }
178 return result;
179 }
180
181 public PListEntry getFirst() throws IOException {
182 PListEntry result = null;
183 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
184 synchronized (indexLock) {
185 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
186 public void execute(Transaction tx) throws IOException {
187 ref.set(getFirst(tx));
188 }
189 });
190 }
191 if (ref.get() != null) {
192 ByteSequence bs = this.store.getPayload(ref.get().getValue());
193 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
194 }
195 return result;
196 }
197
198 public PListEntry getLast() throws IOException {
199 PListEntry result = null;
200 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
201 synchronized (indexLock) {
202 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
203 public void execute(Transaction tx) throws IOException {
204 ref.set(getLast(tx));
205 }
206 });
207 }
208 if (ref.get() != null) {
209 ByteSequence bs = this.store.getPayload(ref.get().getValue());
210 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
211 }
212 return result;
213 }
214
215 @Override
216 public boolean isEmpty() {
217 return size() == 0;
218 }
219
220 @Override
221 public PListIterator iterator() throws IOException {
222 return new PListIteratorImpl();
223 }
224
225 final class PListIteratorImpl implements PListIterator {
226 final Iterator<Map.Entry<String, Location>> iterator;
227 final Transaction tx;
228
229 PListIteratorImpl() throws IOException {
230 tx = store.pageFile.tx();
231 synchronized (indexLock) {
232 this.iterator = iterator(tx);
233 }
234 }
235
236 @Override
237 public boolean hasNext() {
238 return iterator.hasNext();
239 }
240
241 @Override
242 public PListEntry next() {
243 Map.Entry<String, Location> entry = iterator.next();
244 ByteSequence bs = null;
245 try {
246 bs = store.getPayload(entry.getValue());
247 } catch (IOException unexpected) {
248 NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
249 e.initCause(unexpected);
250 throw e;
251 }
252 return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey()));
253 }
254
255 @Override
256 public void remove() {
257 try {
258 synchronized (indexLock) {
259 tx.execute(new Transaction.Closure<IOException>() {
260 @Override
261 public void execute(Transaction tx) throws IOException {
262 iterator.remove();
263 }
264 });
265 }
266 } catch (IOException unexpected) {
267 IllegalStateException e = new IllegalStateException(unexpected);
268 e.initCause(unexpected);
269 throw e;
270 }
271 }
272
273 public void release() {
274 try {
275 tx.rollback();
276 } catch (IOException unexpected) {
277 IllegalStateException e = new IllegalStateException(unexpected);
278 e.initCause(unexpected);
279 throw e;
280 }
281 }
282 }
283
284 public void claimFileLocations(final Set<Integer> candidates) throws IOException {
285 synchronized (indexLock) {
286 if (loaded.get()) {
287 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
288 public void execute(Transaction tx) throws IOException {
289 Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
290 while (iterator.hasNext()) {
291 Location location = iterator.next().getValue();
292 candidates.remove(location.getDataFileId());
293 }
294 }
295 });
296 }
297 }
298 }
299
300 @Override
301 public String toString() {
302 return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
303 }
304 }