001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Iterator;
020    import org.apache.activemq.broker.region.Destination;
021    import org.apache.activemq.broker.region.MessageReference;
022    import org.apache.activemq.command.Message;
023    import org.apache.activemq.command.MessageId;
024    import org.apache.activemq.store.MessageRecoveryListener;
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    /**
029     *  Store based cursor
030     *
031     */
032    public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
033        private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class);
034        protected final Destination regionDestination;
035        protected final PendingList batchList;
036        private Iterator<MessageReference> iterator = null;
037        protected boolean batchResetNeeded = false;
038        private boolean storeHasMessages = false;
039        protected int size;
040        private MessageId lastCachedId;
041        private boolean hadSpace = false;
042    
043        protected AbstractStoreCursor(Destination destination) {
044            super((destination != null ? destination.isPrioritizedMessages():false));
045            this.regionDestination=destination;
046            if (this.prioritizedMessages) {
047                this.batchList= new PrioritizedPendingList();
048            } else {
049                this.batchList = new OrderedPendingList();
050            }
051        }
052        
053        
054        public final synchronized void start() throws Exception{
055            if (!isStarted()) {
056                super.start();
057                resetBatch();
058                resetSize();
059                setCacheEnabled(!this.storeHasMessages&&useCache);
060            } 
061        }
062    
063        protected void resetSize() {
064            this.size = getStoreSize();
065            this.storeHasMessages=this.size > 0;
066        }
067    
068        public final synchronized void stop() throws Exception {
069            resetBatch();
070            super.stop();
071            gc();
072        }
073    
074        
075        public final boolean recoverMessage(Message message) throws Exception {
076            return recoverMessage(message,false);
077        }
078        
079        public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
080            boolean recovered = false;
081            if (recordUniqueId(message.getMessageId())) {
082                if (!cached) {
083                    message.setRegionDestination(regionDestination);
084                    if( message.getMemoryUsage()==null ) {
085                        message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
086                    }
087                }
088                message.incrementReferenceCount();
089                batchList.addMessageLast(message);
090                clearIterator(true);
091                recovered = true;
092                storeHasMessages = true;
093            } else {
094                /*
095                 * we should expect to get these - as the message is recorded as it before it goes into
096                 * the cache. If subsequently, we pull out that message from the store (before its deleted)
097                 * it will be a duplicate - but should be ignored
098                 */
099                if (LOG.isTraceEnabled()) {
100                    LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
101                }
102            }
103            return recovered;
104        }
105        
106        
107        public final synchronized void reset() {
108            if (batchList.isEmpty()) {
109                try {
110                    fillBatch();
111                } catch (Exception e) {
112                    LOG.error(this + " - Failed to fill batch", e);
113                    throw new RuntimeException(e);
114                }
115            }
116            clearIterator(true);
117            size();
118        }
119        
120        
121        public synchronized void release() {
122            clearIterator(false);
123        }
124        
125        private synchronized void clearIterator(boolean ensureIterator) {
126            boolean haveIterator = this.iterator != null;
127            this.iterator=null;
128            if(haveIterator&&ensureIterator) {
129                ensureIterator();
130            }
131        }
132        
133        private synchronized void ensureIterator() {
134            if(this.iterator==null) {
135                this.iterator=this.batchList.iterator();
136            }
137        }
138    
139    
140        public final void finished() {
141        }
142            
143        
144        public final synchronized boolean hasNext() {
145            if (batchList.isEmpty()) {
146                try {
147                    fillBatch();
148                } catch (Exception e) {
149                    LOG.error(this + " - Failed to fill batch", e);
150                    throw new RuntimeException(e);
151                }
152            }
153            ensureIterator();
154            return this.iterator.hasNext();
155        }
156        
157        
158        public final synchronized MessageReference next() {
159            MessageReference result = null;
160            if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
161                result = this.iterator.next();
162            }
163            last = result;
164            if (result != null) {
165                result.incrementReferenceCount();
166            }
167            return result;
168        }
169        
170        
171        public final synchronized void addMessageLast(MessageReference node) throws Exception {
172            boolean disableCache = false;
173            if (hasSpace()) {
174                if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
175                    if (LOG.isTraceEnabled()) {
176                        LOG.trace(this + " - enabling cache for empty store " + node.getMessageId());
177                    }
178                    setCacheEnabled(true);
179                }
180                if (isCacheEnabled()) {
181                    if (recoverMessage(node.getMessage(),true)) {
182                        lastCachedId = node.getMessageId();
183                    } else {
184                        // failed to recover, possible duplicate from concurrent dispatchPending,
185                        // lets not recover further in case of out of order
186                        disableCache = true;
187                    }
188                }
189            } else {
190                disableCache = true;
191            }
192    
193            if (disableCache && isCacheEnabled()) {
194                setCacheEnabled(false);
195                // sync with store on disabling the cache
196                if (lastCachedId != null) {
197                    if (LOG.isTraceEnabled()) {
198                        LOG.trace(this + " - disabling cache"
199                                + ", lastCachedId: " + lastCachedId
200                                + " current node Id: " + node.getMessageId() + " batchList size: " + batchList.size());
201                    }
202                    setBatch(lastCachedId);
203                    lastCachedId = null;
204                }
205            }
206            this.storeHasMessages = true;
207            size++;
208        }
209    
210        protected void setBatch(MessageId messageId) throws Exception {
211        }
212    
213        
214        public final synchronized void addMessageFirst(MessageReference node) throws Exception {
215            setCacheEnabled(false);
216            size++;
217        }
218    
219        
220        public final synchronized void remove() {
221            size--;
222            if (iterator!=null) {
223                iterator.remove();
224            }
225            if (last != null) {
226                last.decrementReferenceCount();
227            }
228        }
229    
230        
231        public final synchronized void remove(MessageReference node) {
232            if (batchList.remove(node) != null) {
233                size--;
234                setCacheEnabled(false);
235            }
236        }
237        
238        
239        public final synchronized void clear() {
240            gc();
241        }
242        
243        
244        public synchronized void gc() {
245            for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
246                MessageReference msg = i.next();
247                rollback(msg.getMessageId());
248                msg.decrementReferenceCount();
249            }
250            batchList.clear();
251            clearIterator(false);
252            batchResetNeeded = true;
253            setCacheEnabled(false);
254        }
255    
256        @Override
257        public boolean hasSpace() {
258            hadSpace = super.hasSpace();
259            return hadSpace;
260        }
261    
262        protected final synchronized void fillBatch() {
263            if (LOG.isTraceEnabled()) {
264                LOG.trace(this + " - fillBatch");
265            }
266            if (batchResetNeeded) {
267                resetSize();
268                setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size));
269                resetBatch();
270                this.batchResetNeeded = false;
271            }
272            if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
273                try {
274                    doFillBatch();
275                } catch (Exception e) {
276                    LOG.error(this + " - Failed to fill batch", e);
277                    throw new RuntimeException(e);
278                }
279                this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
280            }
281        }
282        
283        
284        public final synchronized boolean isEmpty() {
285            // negative means more messages added to store through queue.send since last reset
286            return size == 0;
287        }
288    
289        
290        public final synchronized boolean hasMessagesBufferedToDeliver() {
291            return !batchList.isEmpty();
292        }
293    
294        
295        public final synchronized int size() {
296            if (size < 0) {
297                this.size = getStoreSize();
298            }
299            return size;
300        }
301    
302        @Override
303        public String toString() {
304            return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
305                        + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
306                        + ",maxBatchSize:" + maxBatchSize;
307        }
308        
309        protected abstract void doFillBatch() throws Exception;
310        
311        protected abstract void resetBatch();
312        
313        protected abstract int getStoreSize();
314        
315        protected abstract boolean isStoreEmpty();
316    }