001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.Iterator;
022    import java.util.LinkedHashMap;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.store.MessageRecoveryListener;
032    import org.apache.activemq.store.MessageStore;
033    import org.apache.activemq.store.AbstractMessageStore;
034    import org.apache.activemq.usage.MemoryUsage;
035    import org.apache.activemq.usage.SystemUsage;
036    
037    /**
038     * An implementation of {@link org.apache.activemq.store.MessageStore} which
039     * uses a
040     * 
041     * 
042     */
043    public class MemoryMessageStore extends AbstractMessageStore {
044    
045        protected final Map<MessageId, Message> messageTable;
046        protected MessageId lastBatchId;
047    
048        public MemoryMessageStore(ActiveMQDestination destination) {
049            this(destination, new LinkedHashMap<MessageId, Message>());
050        }
051    
052        public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
053            super(destination);
054            this.messageTable = Collections.synchronizedMap(messageTable);
055        }
056    
057        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
058            synchronized (messageTable) {
059                messageTable.put(message.getMessageId(), message);
060            }
061            message.incrementReferenceCount();
062        }
063    
064        // public void addMessageReference(ConnectionContext context,MessageId
065        // messageId,long expirationTime,String messageRef)
066        // throws IOException{
067        // synchronized(messageTable){
068        // messageTable.put(messageId,messageRef);
069        // }
070        // }
071    
072        public Message getMessage(MessageId identity) throws IOException {
073            return messageTable.get(identity);
074        }
075    
076        // public String getMessageReference(MessageId identity) throws IOException{
077        // return (String)messageTable.get(identity);
078        // }
079    
080        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
081            removeMessage(ack.getLastMessageId());
082        }
083    
084        public void removeMessage(MessageId msgId) throws IOException {
085            synchronized (messageTable) {
086                Message removed = messageTable.remove(msgId);
087                if( removed !=null ) {
088                    removed.decrementReferenceCount();
089                }
090                if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
091                    lastBatchId = null;
092                }
093            }
094        }
095    
096        public void recover(MessageRecoveryListener listener) throws Exception {
097            // the message table is a synchronizedMap - so just have to synchronize
098            // here
099            synchronized (messageTable) {
100                for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
101                    Object msg = iter.next();
102                    if (msg.getClass() == MessageId.class) {
103                        listener.recoverMessageReference((MessageId)msg);
104                    } else {
105                        listener.recoverMessage((Message)msg);
106                    }
107                }
108            }
109        }
110    
111        public void removeAllMessages(ConnectionContext context) throws IOException {
112            synchronized (messageTable) {
113                messageTable.clear();
114            }
115        }
116    
117        public void delete() {
118            synchronized (messageTable) {
119                messageTable.clear();
120            }
121        }
122    
123        
124        public int getMessageCount() {
125            return messageTable.size();
126        }
127    
128        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
129            synchronized (messageTable) {
130                boolean pastLackBatch = lastBatchId == null;
131                int count = 0;
132                for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
133                    Map.Entry entry = (Entry)iter.next();
134                    if (pastLackBatch) {
135                        count++;
136                        Object msg = entry.getValue();
137                        lastBatchId = (MessageId)entry.getKey();
138                        if (msg.getClass() == MessageId.class) {
139                            listener.recoverMessageReference((MessageId)msg);
140                        } else {
141                            listener.recoverMessage((Message)msg);
142                        }
143                    } else {
144                        pastLackBatch = entry.getKey().equals(lastBatchId);
145                    }
146                }
147            }
148        }
149    
150        public void resetBatching() {
151            lastBatchId = null;
152        }
153    
154        @Override
155        public void setBatch(MessageId messageId) {
156            lastBatchId = messageId;
157        }
158        
159    }