001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.List;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.region.Destination;
025    import org.apache.activemq.broker.region.MessageReference;
026    import org.apache.activemq.broker.region.QueueMessageReference;
027    
028    /**
029     * hold pending messages in a linked list (messages awaiting disptach to a
030     * consumer) cursor
031     * 
032     * 
033     */
034    public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
035        private final PendingList list;
036        private Iterator<MessageReference> iter;
037        
038        public VMPendingMessageCursor(boolean prioritizedMessages) {
039            super(prioritizedMessages);
040            if (this.prioritizedMessages) {
041                this.list= new PrioritizedPendingList();
042            }else {
043                this.list = new OrderedPendingList();
044            }
045        }
046    
047        
048        public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
049                throws Exception {
050            List<MessageReference> rc = new ArrayList<MessageReference>();
051            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
052                MessageReference r = iterator.next();
053                if (r.getRegionDestination() == destination) {
054                    r.decrementReferenceCount();
055                    rc.add(r);
056                    iterator.remove();
057                }
058            }
059            return rc;
060        }
061    
062        /**
063         * @return true if there are no pending messages
064         */
065        
066        public synchronized boolean isEmpty() {
067            if (list.isEmpty()) {
068                return true;
069            } else {
070                for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
071                    MessageReference node = iterator.next();
072                    if (node == QueueMessageReference.NULL_MESSAGE) {
073                        continue;
074                    }
075                    if (!node.isDropped()) {
076                        return false;
077                    }
078                    // We can remove dropped references.
079                    iterator.remove();
080                }
081                return true;
082            }
083        }
084    
085        /**
086         * reset the cursor
087         */
088        
089        public synchronized void reset() {
090            iter = list.iterator();
091            last = null;
092        }
093    
094        /**
095         * add message to await dispatch
096         * 
097         * @param node
098         */
099        
100        public synchronized void addMessageLast(MessageReference node) {
101            node.incrementReferenceCount();
102            list.addMessageLast(node);
103        }
104    
105        /**
106         * add message to await dispatch
107         * 
108         * @param position
109         * @param node
110         */
111        
112        public synchronized void addMessageFirst(MessageReference node) {
113            node.incrementReferenceCount();
114            list.addMessageFirst(node);
115        }
116    
117        /**
118         * @return true if there pending messages to dispatch
119         */
120        
121        public synchronized boolean hasNext() {
122            return iter.hasNext();
123        }
124    
125        /**
126         * @return the next pending message
127         */
128        
129        public synchronized MessageReference next() {
130            last = iter.next();
131            if (last != null) {
132                last.incrementReferenceCount();
133            }
134            return last;
135        }
136    
137        /**
138         * remove the message at the cursor position
139         */
140        
141        public synchronized void remove() {
142            if (last != null) {
143                last.decrementReferenceCount();
144            }
145            iter.remove();
146        }
147    
148        /**
149         * @return the number of pending messages
150         */
151        
152        public synchronized int size() {
153            return list.size();
154        }
155    
156        /**
157         * clear all pending messages
158         */
159        
160        public synchronized void clear() {
161            for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
162                MessageReference ref = i.next();
163                ref.decrementReferenceCount();
164            }
165            list.clear();
166        }
167    
168        
169        public synchronized void remove(MessageReference node) {
170            list.remove(node);
171            node.decrementReferenceCount();
172        }
173    
174        /**
175         * Page in a restricted number of messages
176         * 
177         * @param maxItems
178         * @return a list of paged in messages
179         */
180        
181        public LinkedList<MessageReference> pageInList(int maxItems) {
182            LinkedList<MessageReference> result = new LinkedList<MessageReference>();
183            for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
184                MessageReference ref = i.next();
185                ref.incrementReferenceCount();
186                result.add(ref);
187                if (result.size() >= maxItems) {
188                    break;
189                }
190            }
191            return result;
192        }
193    
194        
195        public boolean isTransient() {
196            return true;
197        }
198    
199        
200        public void destroy() throws Exception {
201            super.destroy();
202            clear();
203        }
204    }