001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Collections;
020    import java.util.LinkedList;
021    import java.util.List;
022    import java.util.Set;
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.BaseDestination;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.Subscription;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.usage.SystemUsage;
032    
033    /**
034     * Abstract method holder for pending message (messages awaiting disptach to a
035     * consumer) cursor
036     * 
037     * 
038     */
039    public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040        protected int memoryUsageHighWaterMark = 70;
041        protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042        protected SystemUsage systemUsage;
043        protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044        protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045        protected boolean enableAudit=true;
046        protected ActiveMQMessageAudit audit;
047        protected boolean useCache=true;
048        private boolean cacheEnabled=true;
049        private boolean started=false;
050        protected MessageReference last = null;
051        protected final boolean prioritizedMessages;
052        
053        public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054            this.prioritizedMessages=prioritizedMessages;
055        }
056      
057    
058        public synchronized void start() throws Exception  {
059            if (!started && enableAudit && audit==null) {
060                audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061            }
062            started=true;
063        }
064    
065        public synchronized void stop() throws Exception  {
066            started=false;
067            gc();
068        }
069    
070        public void add(ConnectionContext context, Destination destination) throws Exception {
071        }
072    
073        @SuppressWarnings("unchecked")
074        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
075            return Collections.EMPTY_LIST;
076        }
077    
078        public boolean isRecoveryRequired() {
079            return true;
080        }
081    
082        public void addMessageFirst(MessageReference node) throws Exception {
083        }
084    
085        public void addMessageLast(MessageReference node) throws Exception {
086        }
087        
088        public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
089            addMessageLast(node);
090            return true;
091        }
092    
093        public void addRecoveredMessage(MessageReference node) throws Exception {
094            addMessageLast(node);
095        }
096    
097        public void clear() {
098        }
099    
100        public boolean hasNext() {
101            return false;
102        }
103    
104        public boolean isEmpty() {
105            return false;
106        }
107    
108        public boolean isEmpty(Destination destination) {
109            return isEmpty();
110        }
111    
112        public MessageReference next() {
113            return null;
114        }
115    
116        public void remove() {
117        }
118    
119        public void reset() {
120        }
121    
122        public int size() {
123            return 0;
124        }
125    
126        public int getMaxBatchSize() {
127            return maxBatchSize;
128        }
129    
130        public void setMaxBatchSize(int maxBatchSize) {
131            this.maxBatchSize = maxBatchSize;
132        }
133    
134        protected void fillBatch() throws Exception {
135        }
136    
137        public void resetForGC() {
138            reset();
139        }
140    
141        public void remove(MessageReference node) {
142        }
143    
144        public void gc() {
145        }
146    
147        public void setSystemUsage(SystemUsage usageManager) {
148            this.systemUsage = usageManager;
149        }
150    
151        public boolean hasSpace() {
152            return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
153        }
154    
155        public boolean isFull() {
156            return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
157        }
158    
159        public void release() {
160        }
161    
162        public boolean hasMessagesBufferedToDeliver() {
163            return false;
164        }
165    
166        /**
167         * @return the memoryUsageHighWaterMark
168         */
169        public int getMemoryUsageHighWaterMark() {
170            return memoryUsageHighWaterMark;
171        }
172    
173        /**
174         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
175         */
176        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
177            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
178        }
179    
180        /**
181         * @return the usageManager
182         */
183        public SystemUsage getSystemUsage() {
184            return this.systemUsage;
185        }
186    
187        /**
188         * destroy the cursor
189         * 
190         * @throws Exception
191         */
192        public void destroy() throws Exception {
193            stop();
194        }
195    
196        /**
197         * Page in a restricted number of messages
198         * 
199         * @param maxItems maximum number of messages to return
200         * @return a list of paged in messages
201         */
202        public LinkedList<MessageReference> pageInList(int maxItems) {
203            throw new RuntimeException("Not supported");
204        }
205    
206        /**
207         * @return the maxProducersToAudit
208         */
209        public int getMaxProducersToAudit() {
210            return maxProducersToAudit;
211        }
212    
213        /**
214         * @param maxProducersToAudit the maxProducersToAudit to set
215         */
216        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
217            this.maxProducersToAudit = maxProducersToAudit;
218            if (audit != null) {
219                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
220            }
221        }
222    
223        /**
224         * @return the maxAuditDepth
225         */
226        public int getMaxAuditDepth() {
227            return maxAuditDepth;
228        }
229        
230    
231        /**
232         * @param maxAuditDepth the maxAuditDepth to set
233         */
234        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
235            this.maxAuditDepth = maxAuditDepth;
236            if (audit != null) {
237                audit.setAuditDepth(maxAuditDepth);
238            }
239        }
240        
241        
242        /**
243         * @return the enableAudit
244         */
245        public boolean isEnableAudit() {
246            return enableAudit;
247        }
248    
249        /**
250         * @param enableAudit the enableAudit to set
251         */
252        public synchronized void setEnableAudit(boolean enableAudit) {
253            this.enableAudit = enableAudit;
254            if (enableAudit && started && audit==null) {
255                audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
256            }
257        }
258        
259        public boolean isTransient() {
260            return false;
261        }
262        
263           
264        /**
265         * set the audit
266         * @param audit new audit component
267         */
268        public void setMessageAudit(ActiveMQMessageAudit audit) {
269            this.audit=audit;
270        }
271        
272        
273        /**
274         * @return the audit
275         */
276        public ActiveMQMessageAudit getMessageAudit() {
277            return audit;
278        }
279        
280        public boolean isUseCache() {
281            return useCache;
282        }
283    
284        public void setUseCache(boolean useCache) {
285            this.useCache = useCache;
286        }
287    
288        public synchronized boolean isDuplicate(MessageId messageId) {
289            boolean unique = recordUniqueId(messageId);
290            rollback(messageId);
291            return !unique;
292        }
293        
294        /**
295         * records a message id and checks if it is a duplicate
296         * @param messageId
297         * @return true if id is unique, false otherwise.
298         */
299        public synchronized boolean recordUniqueId(MessageId messageId) {
300            if (!enableAudit || audit==null) {
301                return true;
302            }
303            return !audit.isDuplicate(messageId);
304        }
305        
306        public synchronized void rollback(MessageId id) {
307            if (audit != null) {
308                audit.rollback(id);
309            }
310        }
311        
312        protected synchronized boolean isStarted() {
313            return started;
314        }
315        
316        public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
317            boolean result = false;
318            Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
319            if (destinations != null) {
320                for (Destination dest:destinations) {
321                    if (dest.isPrioritizedMessages()) {
322                        result = true;
323                        break;
324                    }
325                }
326            }
327            return result;
328    
329        }
330    
331        public synchronized boolean isCacheEnabled() {
332            return cacheEnabled;
333        }
334    
335        public synchronized void setCacheEnabled(boolean val) {
336            cacheEnabled = val;
337        }
338    }