001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.Iterator;
020 import org.apache.activemq.broker.region.Destination;
021 import org.apache.activemq.broker.region.MessageReference;
022 import org.apache.activemq.command.Message;
023 import org.apache.activemq.command.MessageId;
024 import org.apache.activemq.store.MessageRecoveryListener;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 /**
029 * Store based cursor
030 *
031 */
032 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
033 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class);
034 protected final Destination regionDestination;
035 protected final PendingList batchList;
036 private Iterator<MessageReference> iterator = null;
037 protected boolean batchResetNeeded = false;
038 private boolean storeHasMessages = false;
039 protected int size;
040 private MessageId lastCachedId;
041 private boolean hadSpace = false;
042
043 protected AbstractStoreCursor(Destination destination) {
044 super((destination != null ? destination.isPrioritizedMessages():false));
045 this.regionDestination=destination;
046 if (this.prioritizedMessages) {
047 this.batchList= new PrioritizedPendingList();
048 } else {
049 this.batchList = new OrderedPendingList();
050 }
051 }
052
053
054 public final synchronized void start() throws Exception{
055 if (!isStarted()) {
056 super.start();
057 resetBatch();
058 resetSize();
059 setCacheEnabled(!this.storeHasMessages&&useCache);
060 }
061 }
062
063 protected void resetSize() {
064 this.size = getStoreSize();
065 this.storeHasMessages=this.size > 0;
066 }
067
068 public final synchronized void stop() throws Exception {
069 resetBatch();
070 super.stop();
071 gc();
072 }
073
074
075 public final boolean recoverMessage(Message message) throws Exception {
076 return recoverMessage(message,false);
077 }
078
079 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
080 boolean recovered = false;
081 if (recordUniqueId(message.getMessageId())) {
082 if (!cached) {
083 message.setRegionDestination(regionDestination);
084 if( message.getMemoryUsage()==null ) {
085 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
086 }
087 }
088 message.incrementReferenceCount();
089 batchList.addMessageLast(message);
090 clearIterator(true);
091 recovered = true;
092 storeHasMessages = true;
093 } else {
094 /*
095 * we should expect to get these - as the message is recorded as it before it goes into
096 * the cache. If subsequently, we pull out that message from the store (before its deleted)
097 * it will be a duplicate - but should be ignored
098 */
099 if (LOG.isTraceEnabled()) {
100 LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
101 }
102 }
103 return recovered;
104 }
105
106
107 public final synchronized void reset() {
108 if (batchList.isEmpty()) {
109 try {
110 fillBatch();
111 } catch (Exception e) {
112 LOG.error(this + " - Failed to fill batch", e);
113 throw new RuntimeException(e);
114 }
115 }
116 clearIterator(true);
117 size();
118 }
119
120
121 public synchronized void release() {
122 clearIterator(false);
123 }
124
125 private synchronized void clearIterator(boolean ensureIterator) {
126 boolean haveIterator = this.iterator != null;
127 this.iterator=null;
128 if(haveIterator&&ensureIterator) {
129 ensureIterator();
130 }
131 }
132
133 private synchronized void ensureIterator() {
134 if(this.iterator==null) {
135 this.iterator=this.batchList.iterator();
136 }
137 }
138
139
140 public final void finished() {
141 }
142
143
144 public final synchronized boolean hasNext() {
145 if (batchList.isEmpty()) {
146 try {
147 fillBatch();
148 } catch (Exception e) {
149 LOG.error(this + " - Failed to fill batch", e);
150 throw new RuntimeException(e);
151 }
152 }
153 ensureIterator();
154 return this.iterator.hasNext();
155 }
156
157
158 public final synchronized MessageReference next() {
159 MessageReference result = null;
160 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
161 result = this.iterator.next();
162 }
163 last = result;
164 if (result != null) {
165 result.incrementReferenceCount();
166 }
167 return result;
168 }
169
170
171 public final synchronized void addMessageLast(MessageReference node) throws Exception {
172 boolean disableCache = false;
173 if (hasSpace()) {
174 if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
175 if (LOG.isTraceEnabled()) {
176 LOG.trace(this + " - enabling cache for empty store " + node.getMessageId());
177 }
178 setCacheEnabled(true);
179 }
180 if (isCacheEnabled()) {
181 if (recoverMessage(node.getMessage(),true)) {
182 lastCachedId = node.getMessageId();
183 } else {
184 // failed to recover, possible duplicate from concurrent dispatchPending,
185 // lets not recover further in case of out of order
186 disableCache = true;
187 }
188 }
189 } else {
190 disableCache = true;
191 }
192
193 if (disableCache && isCacheEnabled()) {
194 setCacheEnabled(false);
195 // sync with store on disabling the cache
196 if (lastCachedId != null) {
197 if (LOG.isTraceEnabled()) {
198 LOG.trace(this + " - disabling cache"
199 + ", lastCachedId: " + lastCachedId
200 + " current node Id: " + node.getMessageId() + " batchList size: " + batchList.size());
201 }
202 setBatch(lastCachedId);
203 lastCachedId = null;
204 }
205 }
206 this.storeHasMessages = true;
207 size++;
208 }
209
210 protected void setBatch(MessageId messageId) throws Exception {
211 }
212
213
214 public final synchronized void addMessageFirst(MessageReference node) throws Exception {
215 setCacheEnabled(false);
216 size++;
217 }
218
219
220 public final synchronized void remove() {
221 size--;
222 if (iterator!=null) {
223 iterator.remove();
224 }
225 if (last != null) {
226 last.decrementReferenceCount();
227 }
228 }
229
230
231 public final synchronized void remove(MessageReference node) {
232 if (batchList.remove(node) != null) {
233 size--;
234 setCacheEnabled(false);
235 }
236 }
237
238
239 public final synchronized void clear() {
240 gc();
241 }
242
243
244 public synchronized void gc() {
245 for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
246 MessageReference msg = i.next();
247 rollback(msg.getMessageId());
248 msg.decrementReferenceCount();
249 }
250 batchList.clear();
251 clearIterator(false);
252 batchResetNeeded = true;
253 setCacheEnabled(false);
254 }
255
256 @Override
257 public boolean hasSpace() {
258 hadSpace = super.hasSpace();
259 return hadSpace;
260 }
261
262 protected final synchronized void fillBatch() {
263 if (LOG.isTraceEnabled()) {
264 LOG.trace(this + " - fillBatch");
265 }
266 if (batchResetNeeded) {
267 resetSize();
268 setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size));
269 resetBatch();
270 this.batchResetNeeded = false;
271 }
272 if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
273 try {
274 doFillBatch();
275 } catch (Exception e) {
276 LOG.error(this + " - Failed to fill batch", e);
277 throw new RuntimeException(e);
278 }
279 this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
280 }
281 }
282
283
284 public final synchronized boolean isEmpty() {
285 // negative means more messages added to store through queue.send since last reset
286 return size == 0;
287 }
288
289
290 public final synchronized boolean hasMessagesBufferedToDeliver() {
291 return !batchList.isEmpty();
292 }
293
294
295 public final synchronized int size() {
296 if (size < 0) {
297 this.size = getStoreSize();
298 }
299 return size;
300 }
301
302 @Override
303 public String toString() {
304 return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
305 + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
306 + ",maxBatchSize:" + maxBatchSize;
307 }
308
309 protected abstract void doFillBatch() throws Exception;
310
311 protected abstract void resetBatch();
312
313 protected abstract int getStoreSize();
314
315 protected abstract boolean isStoreEmpty();
316 }