001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.ArrayList;
020    import java.util.LinkedList;
021    import java.util.List;
022    import org.apache.activemq.command.MessageDispatch;
023    
024    public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
025        private static final Integer MAX_PRIORITY = 10;
026        private final Object mutex = new Object();
027        private final LinkedList<MessageDispatch>[] lists;
028        private boolean closed;
029        private boolean running;
030        private int size = 0;
031    
032        public SimplePriorityMessageDispatchChannel() {
033            this.lists = new LinkedList[MAX_PRIORITY];
034            for (int i = 0; i < MAX_PRIORITY; i++) {
035                lists[i] = new LinkedList<MessageDispatch>();
036            }
037        }
038    
039        /*
040         * (non-Javadoc)
041         * @see
042         * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
043         * .command.MessageDispatch)
044         */
045        public void enqueue(MessageDispatch message) {
046            synchronized (mutex) {
047                getList(message).addLast(message);
048    
049                this.size++;
050                mutex.notify();
051            }
052        }
053    
054        /*
055         * (non-Javadoc)
056         * @see
057         * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
058         * .command.MessageDispatch)
059         */
060        public void enqueueFirst(MessageDispatch message) {
061            synchronized (mutex) {
062                getList(message).addFirst(message);
063                this.size++;
064                mutex.notify();
065            }
066        }
067    
068        /*
069         * (non-Javadoc)
070         * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
071         */
072        public boolean isEmpty() {
073            // synchronized (mutex) {
074            return this.size == 0;
075            // }
076        }
077    
078        /*
079         * (non-Javadoc)
080         * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
081         */
082        public MessageDispatch dequeue(long timeout) throws InterruptedException {
083            synchronized (mutex) {
084                // Wait until the consumer is ready to deliver messages.
085                while (timeout != 0 && !closed && (isEmpty() || !running)) {
086                    if (timeout == -1) {
087                        mutex.wait();
088                    } else {
089                        mutex.wait(timeout);
090                        break;
091                    }
092                }
093                if (closed || !running || isEmpty()) {
094                    return null;
095                }
096                return removeFirst();
097            }
098        }
099    
100        /*
101         * (non-Javadoc)
102         * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
103         */
104        public MessageDispatch dequeueNoWait() {
105            synchronized (mutex) {
106                if (closed || !running || isEmpty()) {
107                    return null;
108                }
109                return removeFirst();
110            }
111        }
112    
113        /*
114         * (non-Javadoc)
115         * @see org.apache.activemq.MessageDispatchChannelI#peek()
116         */
117        public MessageDispatch peek() {
118            synchronized (mutex) {
119                if (closed || !running || isEmpty()) {
120                    return null;
121                }
122                return getFirst();
123            }
124        }
125    
126        /*
127         * (non-Javadoc)
128         * @see org.apache.activemq.MessageDispatchChannelI#start()
129         */
130        public void start() {
131            synchronized (mutex) {
132                running = true;
133                mutex.notifyAll();
134            }
135        }
136    
137        /*
138         * (non-Javadoc)
139         * @see org.apache.activemq.MessageDispatchChannelI#stop()
140         */
141        public void stop() {
142            synchronized (mutex) {
143                running = false;
144                mutex.notifyAll();
145            }
146        }
147    
148        /*
149         * (non-Javadoc)
150         * @see org.apache.activemq.MessageDispatchChannelI#close()
151         */
152        public void close() {
153            synchronized (mutex) {
154                if (!closed) {
155                    running = false;
156                    closed = true;
157                }
158                mutex.notifyAll();
159            }
160        }
161    
162        /*
163         * (non-Javadoc)
164         * @see org.apache.activemq.MessageDispatchChannelI#clear()
165         */
166        public void clear() {
167            synchronized (mutex) {
168                for (int i = 0; i < MAX_PRIORITY; i++) {
169                    lists[i].clear();
170                }
171            }
172        }
173    
174        /*
175         * (non-Javadoc)
176         * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
177         */
178        public boolean isClosed() {
179            return closed;
180        }
181    
182        /*
183         * (non-Javadoc)
184         * @see org.apache.activemq.MessageDispatchChannelI#size()
185         */
186        public int size() {
187            synchronized (mutex) {
188                return this.size;
189            }
190        }
191    
192        /*
193         * (non-Javadoc)
194         * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
195         */
196        public Object getMutex() {
197            return mutex;
198        }
199    
200        /*
201         * (non-Javadoc)
202         * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
203         */
204        public boolean isRunning() {
205            return running;
206        }
207    
208        /*
209         * (non-Javadoc)
210         * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
211         */
212        public List<MessageDispatch> removeAll() {
213    
214            synchronized (mutex) {
215                ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
216                for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
217                    List<MessageDispatch> list = lists[i];
218                    result.addAll(list);
219                    size -= list.size();
220                    list.clear();
221                }
222                return result;
223            }
224        }
225    
226        @Override
227        public String toString() {
228    
229            String result = "";
230            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
231                result += i + ":{" + lists[i].toString() + "}";
232            }
233            return result;
234    
235        }
236    
237        protected int getPriority(MessageDispatch message) {
238            int priority = javax.jms.Message.DEFAULT_PRIORITY;
239            if (message.getMessage() != null) {
240                    priority = Math.max(message.getMessage().getPriority(), 0);
241                    priority = Math.min(priority, 9);
242            }
243            return priority;
244        }
245    
246        protected LinkedList<MessageDispatch> getList(MessageDispatch md) {
247            return lists[getPriority(md)];
248        }
249    
250        private final MessageDispatch removeFirst() {
251            if (this.size > 0) {
252                for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
253                    LinkedList<MessageDispatch> list = lists[i];
254                    if (!list.isEmpty()) {
255                        this.size--;
256                        return list.removeFirst();
257                    }
258                }
259            }
260            return null;
261        }
262    
263        private final MessageDispatch getFirst() {
264            if (this.size > 0) {
265                for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
266                    LinkedList<MessageDispatch> list = lists[i];
267                    if (!list.isEmpty()) {
268                        return list.getFirst();
269                    }
270                }
271            }
272            return null;
273        }
274    }