001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.LinkedList;
021import java.util.List;
022import org.apache.activemq.command.MessageDispatch;
023
024public class FifoMessageDispatchChannel implements MessageDispatchChannel {
025
026    private final Object mutex = new Object();
027    private final LinkedList<MessageDispatch> list;
028    private boolean closed;
029    private boolean running;
030
031    public FifoMessageDispatchChannel() {
032        this.list = new LinkedList<MessageDispatch>();
033    }
034
035    /* (non-Javadoc)
036     * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
037     */
038    public void enqueue(MessageDispatch message) {
039        synchronized (mutex) {
040            list.addLast(message);
041            mutex.notify();
042        }
043    }
044
045    /* (non-Javadoc)
046     * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
047     */
048    public void enqueueFirst(MessageDispatch message) {
049        synchronized (mutex) {
050            list.addFirst(message);
051            mutex.notify();
052        }
053    }
054
055    /* (non-Javadoc)
056     * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
057     */
058    public boolean isEmpty() {
059        synchronized (mutex) {
060            return list.isEmpty();
061        }
062    }
063
064    /* (non-Javadoc)
065     * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
066     */
067    public MessageDispatch dequeue(long timeout) throws InterruptedException {
068        synchronized (mutex) {
069            // Wait until the consumer is ready to deliver messages.
070            while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
071                if (timeout == -1) {
072                    mutex.wait();
073                } else {
074                    mutex.wait(timeout);
075                    break;
076                }
077            }
078            if (closed || !running || list.isEmpty()) {
079                return null;
080            }
081            return list.removeFirst();
082        }
083    }
084
085    /* (non-Javadoc)
086     * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
087     */
088    public MessageDispatch dequeueNoWait() {
089        synchronized (mutex) {
090            if (closed || !running || list.isEmpty()) {
091                return null;
092            }
093            return list.removeFirst();
094        }
095    }
096
097    /* (non-Javadoc)
098     * @see org.apache.activemq.MessageDispatchChannelI#peek()
099     */
100    public MessageDispatch peek() {
101        synchronized (mutex) {
102            if (closed || !running || list.isEmpty()) {
103                return null;
104            }
105            return list.getFirst();
106        }
107    }
108
109    /* (non-Javadoc)
110     * @see org.apache.activemq.MessageDispatchChannelI#start()
111     */
112    public void start() {
113        synchronized (mutex) {
114            running = true;
115            mutex.notifyAll();
116        }
117    }
118
119    /* (non-Javadoc)
120     * @see org.apache.activemq.MessageDispatchChannelI#stop()
121     */
122    public void stop() {
123        synchronized (mutex) {
124            running = false;
125            mutex.notifyAll();
126        }
127    }
128
129    /* (non-Javadoc)
130     * @see org.apache.activemq.MessageDispatchChannelI#close()
131     */
132    public void close() {
133        synchronized (mutex) {
134            if (!closed) {
135                running = false;
136                closed = true;
137            }
138            mutex.notifyAll();
139        }
140    }
141
142    /* (non-Javadoc)
143     * @see org.apache.activemq.MessageDispatchChannelI#clear()
144     */
145    public void clear() {
146        synchronized (mutex) {
147            list.clear();
148        }
149    }
150
151    /* (non-Javadoc)
152     * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
153     */
154    public boolean isClosed() {
155        return closed;
156    }
157
158    /* (non-Javadoc)
159     * @see org.apache.activemq.MessageDispatchChannelI#size()
160     */
161    public int size() {
162        synchronized (mutex) {
163            return list.size();
164        }
165    }
166
167    /* (non-Javadoc)
168     * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
169     */
170    public Object getMutex() {
171        return mutex;
172    }
173
174    /* (non-Javadoc)
175     * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
176     */
177    public boolean isRunning() {
178        return running;
179    }
180
181    /* (non-Javadoc)
182     * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
183     */
184    public List<MessageDispatch> removeAll() {
185        synchronized (mutex) {
186            ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
187            list.clear();
188            return rc;
189        }
190    }
191
192    @Override
193    public String toString() {
194        synchronized (mutex) {
195            return list.toString();
196        }
197    }
198}