001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import org.apache.activemq.broker.Broker;
020    import org.apache.activemq.broker.region.MessageReference;
021    import org.apache.activemq.broker.region.Queue;
022    import org.apache.activemq.command.Message;
023    import org.apache.activemq.usage.SystemUsage;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    /**
028     * Store based Cursor for Queues
029     */
030    public class StoreQueueCursor extends AbstractPendingMessageCursor {
031    
032        private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
033        private final Broker broker;
034        private int pendingCount;
035        private final Queue queue;
036        private PendingMessageCursor nonPersistent;
037        private final QueueStorePrefetch persistent;
038        private boolean started;
039        private PendingMessageCursor currentCursor;
040    
041        /**
042         * Construct
043         * @param broker
044         * @param queue
045         */
046        public StoreQueueCursor(Broker broker,Queue queue) {
047            super((queue != null ? queue.isPrioritizedMessages():false));
048            this.broker=broker;
049            this.queue = queue;
050            this.persistent = new QueueStorePrefetch(queue);
051            currentCursor = persistent;
052        }
053    
054        public synchronized void start() throws Exception {
055            started = true;
056            super.start();
057            if (nonPersistent == null) {
058                if (broker.getBrokerService().isPersistent()) {
059                    nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
060                }else {
061                    nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
062                }
063                nonPersistent.setMaxBatchSize(getMaxBatchSize());
064                nonPersistent.setSystemUsage(systemUsage);
065                nonPersistent.setEnableAudit(isEnableAudit());
066                nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
067                nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
068            }
069            nonPersistent.setMessageAudit(getMessageAudit());
070            nonPersistent.start();
071            persistent.setMessageAudit(getMessageAudit());
072            persistent.start();
073            pendingCount = persistent.size() + nonPersistent.size();
074        }
075    
076        public synchronized void stop() throws Exception {
077            started = false;
078            if (nonPersistent != null) {
079    //            nonPersistent.clear();
080    //            nonPersistent.stop();
081    //            nonPersistent.gc();
082              nonPersistent.destroy();
083            }
084            persistent.stop();
085            persistent.gc();
086            super.stop();
087            pendingCount = 0;
088        }
089    
090        public synchronized void addMessageLast(MessageReference node) throws Exception {
091            if (node != null) {
092                Message msg = node.getMessage();
093                if (started) {
094                    pendingCount++;
095                    if (!msg.isPersistent()) {
096                        nonPersistent.addMessageLast(node);
097                    }
098                }
099                if (msg.isPersistent()) {
100                    persistent.addMessageLast(node);
101                }
102            }
103        }
104    
105        public synchronized void addMessageFirst(MessageReference node) throws Exception {
106            if (node != null) {
107                Message msg = node.getMessage();
108                if (started) {
109                    pendingCount++;
110                    if (!msg.isPersistent()) {
111                        nonPersistent.addMessageFirst(node);
112                    }
113                }
114                if (msg.isPersistent()) {
115                    persistent.addMessageFirst(node);
116                }
117            }
118        }
119    
120        public synchronized void clear() {
121            pendingCount = 0;
122        }
123    
124        public synchronized boolean hasNext() {
125            try {
126                getNextCursor();
127            } catch (Exception e) {
128                LOG.error("Failed to get current cursor ", e);
129                throw new RuntimeException(e);
130           }
131           return currentCursor != null ? currentCursor.hasNext() : false;
132        }
133    
134        public synchronized MessageReference next() {
135            MessageReference result = currentCursor != null ? currentCursor.next() : null;
136            return result;
137        }
138    
139        public synchronized void remove() {
140            if (currentCursor != null) {
141                currentCursor.remove();
142            }
143            pendingCount--;
144        }
145    
146        public synchronized void remove(MessageReference node) {
147            if (!node.isPersistent()) {
148                nonPersistent.remove(node);
149            } else {
150                persistent.remove(node);
151            }
152            pendingCount--;
153        }
154    
155        public synchronized void reset() {
156            nonPersistent.reset();
157            persistent.reset();
158            pendingCount = persistent.size() + nonPersistent.size();
159        }
160    
161        public void release() {
162            nonPersistent.release();
163            persistent.release();
164        }
165    
166    
167        public synchronized int size() {
168            if (pendingCount < 0) {
169                pendingCount = persistent.size() + nonPersistent.size();
170            }
171            return pendingCount;
172        }
173    
174        public synchronized boolean isEmpty() {
175            // if negative, more messages arrived in store since last reset so non empty
176            return pendingCount == 0;
177        }
178    
179        /**
180         * Informs the Broker if the subscription needs to intervention to recover
181         * it's state e.g. DurableTopicSubscriber may do
182         *
183         * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
184         * @return true if recovery required
185         */
186        public boolean isRecoveryRequired() {
187            return false;
188        }
189    
190        /**
191         * @return the nonPersistent Cursor
192         */
193        public PendingMessageCursor getNonPersistent() {
194            return this.nonPersistent;
195        }
196    
197        /**
198         * @param nonPersistent cursor to set
199         */
200        public void setNonPersistent(PendingMessageCursor nonPersistent) {
201            this.nonPersistent = nonPersistent;
202        }
203    
204        public void setMaxBatchSize(int maxBatchSize) {
205            persistent.setMaxBatchSize(maxBatchSize);
206            if (nonPersistent != null) {
207                nonPersistent.setMaxBatchSize(maxBatchSize);
208            }
209            super.setMaxBatchSize(maxBatchSize);
210        }
211    
212    
213        public void setMaxProducersToAudit(int maxProducersToAudit) {
214            super.setMaxProducersToAudit(maxProducersToAudit);
215            if (persistent != null) {
216                persistent.setMaxProducersToAudit(maxProducersToAudit);
217            }
218            if (nonPersistent != null) {
219                nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
220            }
221        }
222    
223        public void setMaxAuditDepth(int maxAuditDepth) {
224            super.setMaxAuditDepth(maxAuditDepth);
225            if (persistent != null) {
226                persistent.setMaxAuditDepth(maxAuditDepth);
227            }
228            if (nonPersistent != null) {
229                nonPersistent.setMaxAuditDepth(maxAuditDepth);
230            }
231        }
232    
233        public void setEnableAudit(boolean enableAudit) {
234            super.setEnableAudit(enableAudit);
235            if (persistent != null) {
236                persistent.setEnableAudit(enableAudit);
237            }
238            if (nonPersistent != null) {
239                nonPersistent.setEnableAudit(enableAudit);
240            }
241        }
242    
243        @Override
244        public void setUseCache(boolean useCache) {
245            super.setUseCache(useCache);
246            if (persistent != null) {
247                persistent.setUseCache(useCache);
248            }
249            if (nonPersistent != null) {
250                nonPersistent.setUseCache(useCache);
251            }
252        }
253    
254        @Override
255        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
256            super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
257            if (persistent != null) {
258                persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
259            }
260            if (nonPersistent != null) {
261                nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
262            }
263        }
264    
265    
266    
267        public synchronized void gc() {
268            if (persistent != null) {
269                persistent.gc();
270            }
271            if (nonPersistent != null) {
272                nonPersistent.gc();
273            }
274            pendingCount = persistent.size() + nonPersistent.size();
275        }
276    
277        public void setSystemUsage(SystemUsage usageManager) {
278            super.setSystemUsage(usageManager);
279            if (persistent != null) {
280                persistent.setSystemUsage(usageManager);
281            }
282            if (nonPersistent != null) {
283                nonPersistent.setSystemUsage(usageManager);
284            }
285        }
286    
287        protected synchronized PendingMessageCursor getNextCursor() throws Exception {
288            if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
289                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
290                // sanity check
291                if (currentCursor.isEmpty()) {
292                    currentCursor = currentCursor == persistent ? nonPersistent : persistent;
293                }
294            }
295            return currentCursor;
296        }
297    
298        @Override
299        public boolean isCacheEnabled() {
300            boolean cacheEnabled = isUseCache();
301            if (cacheEnabled) {
302                if (persistent != null) {
303                    cacheEnabled &= persistent.isCacheEnabled();
304                }
305                if (nonPersistent != null) {
306                    cacheEnabled &= nonPersistent.isCacheEnabled();
307                }
308                setCacheEnabled(cacheEnabled);
309            }
310            return cacheEnabled;
311        }
312    }