001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.memory.buffer;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.List;
023    
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.command.ActiveMQMessage;
026    import org.apache.activemq.command.Message;
027    
028    /**
029     * Allows messages to be added to the end of the buffer such that they are kept
030     * around and evicted in a FIFO manner.
031     * 
032     * 
033     */
034    public class MessageQueue {
035    
036        private MessageBuffer buffer;
037        private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
038        private int size;
039        private Object lock = new Object();
040        private int position;
041    
042        public MessageQueue(MessageBuffer buffer) {
043            this.buffer = buffer;
044        }
045    
046        public void add(MessageReference messageRef) {
047            Message message = messageRef.getMessageHardRef();
048            int delta = message.getSize();
049            int newSize = 0;
050            synchronized (lock) {
051                list.add(messageRef);
052                size += delta;
053                newSize = size;
054            }
055            buffer.onSizeChanged(this, delta, newSize);
056        }
057        
058        public void add(ActiveMQMessage message) {
059            int delta = message.getSize();
060            int newSize = 0;
061            synchronized (lock) {
062                list.add(message);
063                size += delta;
064                newSize = size;
065            }
066            buffer.onSizeChanged(this, delta, newSize);
067        }
068    
069        public int evictMessage() {
070            synchronized (lock) {
071                if (!list.isEmpty()) {
072                    ActiveMQMessage message = (ActiveMQMessage) list.removeFirst();
073                    int messageSize = message.getSize();
074                    size -= messageSize;
075                    return messageSize;
076                }
077            }
078            return 0;
079        }
080    
081        /**
082         * Returns a copy of the list
083         */
084        public List<MessageReference> getList() {
085            synchronized (lock) {
086                return new ArrayList<MessageReference>(list);
087            }
088        }
089    
090        public void appendMessages(List<MessageReference> answer) {
091            synchronized (lock) {
092                for (Iterator<MessageReference> iter = list.iterator(); iter.hasNext();) {
093                    answer.add(iter.next());
094                }
095            }
096        }
097    
098        public int getSize() {
099            synchronized (lock) {
100                return size;
101            }
102        }
103    
104        public int getPosition() {
105            return position;
106        }
107    
108        public void setPosition(int position) {
109            this.position = position;
110        }
111    
112        public void clear() {
113            synchronized (lock) {
114                list.clear();
115                size = 0;
116            }
117        }
118    
119    }