001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.ArrayList;
020    import java.util.LinkedList;
021    import java.util.List;
022    import org.apache.activemq.command.MessageDispatch;
023    
024    public class FifoMessageDispatchChannel implements MessageDispatchChannel {
025    
026        private final Object mutex = new Object();
027        private final LinkedList<MessageDispatch> list;
028        private boolean closed;
029        private boolean running;
030    
031        public FifoMessageDispatchChannel() {
032            this.list = new LinkedList<MessageDispatch>();
033        }
034    
035        /* (non-Javadoc)
036         * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
037         */
038        public void enqueue(MessageDispatch message) {
039            synchronized (mutex) {
040                list.addLast(message);
041                mutex.notify();
042            }
043        }
044    
045        /* (non-Javadoc)
046         * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
047         */
048        public void enqueueFirst(MessageDispatch message) {
049            synchronized (mutex) {
050                list.addFirst(message);
051                mutex.notify();
052            }
053        }
054    
055        /* (non-Javadoc)
056         * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
057         */
058        public boolean isEmpty() {
059            synchronized (mutex) {
060                return list.isEmpty();
061            }
062        }
063    
064        /* (non-Javadoc)
065         * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
066         */
067        public MessageDispatch dequeue(long timeout) throws InterruptedException {
068            synchronized (mutex) {
069                // Wait until the consumer is ready to deliver messages.
070                while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
071                    if (timeout == -1) {
072                        mutex.wait();
073                    } else {
074                        mutex.wait(timeout);
075                        break;
076                    }
077                }
078                if (closed || !running || list.isEmpty()) {
079                    return null;
080                }
081                return list.removeFirst();
082            }
083        }
084    
085        /* (non-Javadoc)
086         * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
087         */
088        public MessageDispatch dequeueNoWait() {
089            synchronized (mutex) {
090                if (closed || !running || list.isEmpty()) {
091                    return null;
092                }
093                return list.removeFirst();
094            }
095        }
096    
097        /* (non-Javadoc)
098         * @see org.apache.activemq.MessageDispatchChannelI#peek()
099         */
100        public MessageDispatch peek() {
101            synchronized (mutex) {
102                if (closed || !running || list.isEmpty()) {
103                    return null;
104                }
105                return list.getFirst();
106            }
107        }
108    
109        /* (non-Javadoc)
110         * @see org.apache.activemq.MessageDispatchChannelI#start()
111         */
112        public void start() {
113            synchronized (mutex) {
114                running = true;
115                mutex.notifyAll();
116            }
117        }
118    
119        /* (non-Javadoc)
120         * @see org.apache.activemq.MessageDispatchChannelI#stop()
121         */
122        public void stop() {
123            synchronized (mutex) {
124                running = false;
125                mutex.notifyAll();
126            }
127        }
128    
129        /* (non-Javadoc)
130         * @see org.apache.activemq.MessageDispatchChannelI#close()
131         */
132        public void close() {
133            synchronized (mutex) {
134                if (!closed) {
135                    running = false;
136                    closed = true;
137                }
138                mutex.notifyAll();
139            }
140        }
141    
142        /* (non-Javadoc)
143         * @see org.apache.activemq.MessageDispatchChannelI#clear()
144         */
145        public void clear() {
146            synchronized (mutex) {
147                list.clear();
148            }
149        }
150    
151        /* (non-Javadoc)
152         * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
153         */
154        public boolean isClosed() {
155            return closed;
156        }
157    
158        /* (non-Javadoc)
159         * @see org.apache.activemq.MessageDispatchChannelI#size()
160         */
161        public int size() {
162            synchronized (mutex) {
163                return list.size();
164            }
165        }
166    
167        /* (non-Javadoc)
168         * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
169         */
170        public Object getMutex() {
171            return mutex;
172        }
173    
174        /* (non-Javadoc)
175         * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
176         */
177        public boolean isRunning() {
178            return running;
179        }
180    
181        /* (non-Javadoc)
182         * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
183         */
184        public List<MessageDispatch> removeAll() {
185            synchronized (mutex) {
186                ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
187                list.clear();
188                return rc;
189            }
190        }
191    
192        @Override
193        public String toString() {
194            synchronized (mutex) {
195                return list.toString();
196            }
197        }
198    }