001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.LinkedList;
021import java.util.List;
022
023import org.apache.activemq.command.MessageDispatch;
024
025public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
026    private static final Integer MAX_PRIORITY = 10;
027    private final Object mutex = new Object();
028    private final LinkedList<MessageDispatch>[] lists;
029    private boolean closed;
030    private boolean running;
031    private int size = 0;
032
033    @SuppressWarnings("unchecked")
034    public SimplePriorityMessageDispatchChannel() {
035        this.lists = new LinkedList[MAX_PRIORITY];
036        for (int i = 0; i < MAX_PRIORITY; i++) {
037            lists[i] = new LinkedList<MessageDispatch>();
038        }
039    }
040
041    /*
042     * (non-Javadoc)
043     *
044     * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
045     */
046    @Override
047    public void enqueue(MessageDispatch message) {
048        synchronized (mutex) {
049            getList(message).addLast(message);
050            this.size++;
051            mutex.notify();
052        }
053    }
054
055    /*
056     * (non-Javadoc)
057     *
058     * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
059     */
060    @Override
061    public void enqueueFirst(MessageDispatch message) {
062        synchronized (mutex) {
063            getList(message).addFirst(message);
064            this.size++;
065            mutex.notify();
066        }
067    }
068
069    /*
070     * (non-Javadoc)
071     *
072     * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
073     */
074    @Override
075    public boolean isEmpty() {
076        return this.size == 0;
077    }
078
079    /*
080     * (non-Javadoc)
081     *
082     * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
083     */
084    @Override
085    public MessageDispatch dequeue(long timeout) throws InterruptedException {
086        synchronized (mutex) {
087            // Wait until the consumer is ready to deliver messages.
088            while (timeout != 0 && !closed && (isEmpty() || !running)) {
089                if (timeout == -1) {
090                    mutex.wait();
091                } else {
092                    mutex.wait(timeout);
093                    break;
094                }
095            }
096            if (closed || !running || isEmpty()) {
097                return null;
098            }
099            return removeFirst();
100        }
101    }
102
103    /*
104     * (non-Javadoc)
105     *
106     * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
107     */
108    @Override
109    public MessageDispatch dequeueNoWait() {
110        synchronized (mutex) {
111            if (closed || !running || isEmpty()) {
112                return null;
113            }
114            return removeFirst();
115        }
116    }
117
118    /*
119     * (non-Javadoc)
120     *
121     * @see org.apache.activemq.MessageDispatchChannelI#peek()
122     */
123    @Override
124    public MessageDispatch peek() {
125        synchronized (mutex) {
126            if (closed || !running || isEmpty()) {
127                return null;
128            }
129            return getFirst();
130        }
131    }
132
133    /*
134     * (non-Javadoc)
135     *
136     * @see org.apache.activemq.MessageDispatchChannelI#start()
137     */
138    @Override
139    public void start() {
140        synchronized (mutex) {
141            running = true;
142            mutex.notifyAll();
143        }
144    }
145
146    /*
147     * (non-Javadoc)
148     *
149     * @see org.apache.activemq.MessageDispatchChannelI#stop()
150     */
151    @Override
152    public void stop() {
153        synchronized (mutex) {
154            running = false;
155            mutex.notifyAll();
156        }
157    }
158
159    /*
160     * (non-Javadoc)
161     *
162     * @see org.apache.activemq.MessageDispatchChannelI#close()
163     */
164    @Override
165    public void close() {
166        synchronized (mutex) {
167            if (!closed) {
168                running = false;
169                closed = true;
170            }
171            mutex.notifyAll();
172        }
173    }
174
175    /*
176     * (non-Javadoc)
177     *
178     * @see org.apache.activemq.MessageDispatchChannelI#clear()
179     */
180    @Override
181    public void clear() {
182        synchronized (mutex) {
183            for (int i = 0; i < MAX_PRIORITY; i++) {
184                lists[i].clear();
185            }
186            this.size = 0;
187        }
188    }
189
190    /*
191     * (non-Javadoc)
192     *
193     * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
194     */
195    @Override
196    public boolean isClosed() {
197        return closed;
198    }
199
200    /*
201     * (non-Javadoc)
202     *
203     * @see org.apache.activemq.MessageDispatchChannelI#size()
204     */
205    @Override
206    public int size() {
207        synchronized (mutex) {
208            return this.size;
209        }
210    }
211
212    /*
213     * (non-Javadoc)
214     *
215     * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
216     */
217    @Override
218    public Object getMutex() {
219        return mutex;
220    }
221
222    /*
223     * (non-Javadoc)
224     *
225     * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
226     */
227    @Override
228    public boolean isRunning() {
229        return running;
230    }
231
232    /*
233     * (non-Javadoc)
234     *
235     * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
236     */
237    @Override
238    public List<MessageDispatch> removeAll() {
239        synchronized (mutex) {
240            ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
241            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
242                List<MessageDispatch> list = lists[i];
243                result.addAll(list);
244                size -= list.size();
245                list.clear();
246            }
247            return result;
248        }
249    }
250
251    @Override
252    public String toString() {
253        String result = "";
254        for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
255            result += i + ":{" + lists[i].toString() + "}";
256        }
257        return result;
258    }
259
260    protected int getPriority(MessageDispatch message) {
261        int priority = javax.jms.Message.DEFAULT_PRIORITY;
262        if (message.getMessage() != null) {
263            priority = Math.max(message.getMessage().getPriority(), 0);
264            priority = Math.min(priority, 9);
265        }
266        return priority;
267    }
268
269    protected LinkedList<MessageDispatch> getList(MessageDispatch md) {
270        return lists[getPriority(md)];
271    }
272
273    private final MessageDispatch removeFirst() {
274        if (this.size > 0) {
275            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
276                LinkedList<MessageDispatch> list = lists[i];
277                if (!list.isEmpty()) {
278                    this.size--;
279                    return list.removeFirst();
280                }
281            }
282        }
283        return null;
284    }
285
286    private final MessageDispatch getFirst() {
287        if (this.size > 0) {
288            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
289                LinkedList<MessageDispatch> list = lists[i];
290                if (!list.isEmpty()) {
291                    return list.getFirst();
292                }
293            }
294        }
295        return null;
296    }
297}