001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    
026    import org.apache.activemq.broker.region.MessageReference;
027    import org.apache.activemq.command.MessageId;
028    
029    public class PrioritizedPendingList implements PendingList {
030    
031        private static final Integer MAX_PRIORITY = 10;
032        private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
033        private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
034    
035        public PrioritizedPendingList() {
036            for (int i = 0; i < MAX_PRIORITY; i++) {
037                this.lists[i] = new OrderedPendingList();
038            }
039        }
040    
041        public PendingNode addMessageFirst(MessageReference message) {
042            PendingNode node = getList(message).addMessageFirst(message);
043            this.map.put(message.getMessageId(), node);
044            return node;
045        }
046    
047        public PendingNode addMessageLast(MessageReference message) {
048            PendingNode node = getList(message).addMessageLast(message);
049            this.map.put(message.getMessageId(), node);
050            return node;
051        }
052    
053        public void clear() {
054            for (int i = 0; i < MAX_PRIORITY; i++) {
055                this.lists[i].clear();
056            }
057            this.map.clear();
058        }
059    
060        public boolean isEmpty() {
061            return this.map.isEmpty();
062        }
063    
064        public Iterator<MessageReference> iterator() {
065            return new PrioritizedPendingListIterator();
066        }
067    
068        public PendingNode remove(MessageReference message) {
069            PendingNode node = null;
070            if (message != null) {
071                node = this.map.remove(message.getMessageId());
072                if (node != null) {
073                    node.getList().removeNode(node);
074                }
075            }
076            return node;
077        }
078    
079        public int size() {
080            return this.map.size();
081        }
082    
083        @Override
084        public String toString() {
085            return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
086        }
087    
088        protected int getPriority(MessageReference message) {
089            int priority = javax.jms.Message.DEFAULT_PRIORITY;
090            if (message.getMessageId() != null) {
091                priority = Math.max(message.getMessage().getPriority(), 0);
092                priority = Math.min(priority, 9);
093            }
094            return priority;
095        }
096    
097        protected OrderedPendingList getList(MessageReference msg) {
098            return lists[getPriority(msg)];
099        }
100    
101        private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
102            private int index = 0;
103            private int currentIndex = 0;
104            List<PendingNode> list = new ArrayList<PendingNode>(size());
105    
106            PrioritizedPendingListIterator() {
107                for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
108                    OrderedPendingList orderedPendingList = lists[i];
109                    if (!orderedPendingList.isEmpty()) {
110                        list.addAll(orderedPendingList.getAsList());
111                    }
112                }
113            }
114            public boolean hasNext() {
115                return list.size() > index;
116            }
117    
118            public MessageReference next() {
119                PendingNode node = list.get(this.index);
120                this.currentIndex = this.index;
121                this.index++;
122                return node.getMessage();
123            }
124    
125            public void remove() {
126                PendingNode node = list.get(this.currentIndex);
127                if (node != null) {
128                    map.remove(node.getMessage().getMessageId());
129                    node.getList().removeNode(node);
130                }
131            }
132        }
133    
134        @Override
135        public boolean contains(MessageReference message) {
136            if (map.values().contains(message)) {
137                return true;
138            }
139    
140            return false;
141        }
142    
143        @Override
144        public Collection<MessageReference> values() {
145            List<MessageReference> messageReferences = new ArrayList<MessageReference>();
146            for (PendingNode pendingNode : map.values()) {
147                messageReferences.add(pendingNode.getMessage());
148            }
149            return messageReferences;
150        }
151    
152        @Override
153        public void addAll(PendingList pendingList) {
154            for(MessageReference messageReference : pendingList) {
155                addMessageLast(messageReference);
156            }
157        }
158    
159    }