001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.Iterator;
022    import java.util.LinkedHashMap;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.store.MessageRecoveryListener;
032    import org.apache.activemq.store.AbstractMessageStore;
033    
034    /**
035     * An implementation of {@link org.apache.activemq.store.MessageStore} which
036     * uses a
037     * 
038     * 
039     */
040    public class MemoryMessageStore extends AbstractMessageStore {
041    
042        protected final Map<MessageId, Message> messageTable;
043        protected MessageId lastBatchId;
044    
045        public MemoryMessageStore(ActiveMQDestination destination) {
046            this(destination, new LinkedHashMap<MessageId, Message>());
047        }
048    
049        public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
050            super(destination);
051            this.messageTable = Collections.synchronizedMap(messageTable);
052        }
053    
054        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
055            synchronized (messageTable) {
056                messageTable.put(message.getMessageId(), message);
057            }
058            message.incrementReferenceCount();
059        }
060    
061        // public void addMessageReference(ConnectionContext context,MessageId
062        // messageId,long expirationTime,String messageRef)
063        // throws IOException{
064        // synchronized(messageTable){
065        // messageTable.put(messageId,messageRef);
066        // }
067        // }
068    
069        public Message getMessage(MessageId identity) throws IOException {
070            return messageTable.get(identity);
071        }
072    
073        // public String getMessageReference(MessageId identity) throws IOException{
074        // return (String)messageTable.get(identity);
075        // }
076    
077        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
078            removeMessage(ack.getLastMessageId());
079        }
080    
081        public void removeMessage(MessageId msgId) throws IOException {
082            synchronized (messageTable) {
083                Message removed = messageTable.remove(msgId);
084                if( removed !=null ) {
085                    removed.decrementReferenceCount();
086                }
087                if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
088                    lastBatchId = null;
089                }
090            }
091        }
092    
093        public void recover(MessageRecoveryListener listener) throws Exception {
094            // the message table is a synchronizedMap - so just have to synchronize
095            // here
096            synchronized (messageTable) {
097                for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
098                    Object msg = iter.next();
099                    if (msg.getClass() == MessageId.class) {
100                        listener.recoverMessageReference((MessageId)msg);
101                    } else {
102                        listener.recoverMessage((Message)msg);
103                    }
104                }
105            }
106        }
107    
108        public void removeAllMessages(ConnectionContext context) throws IOException {
109            synchronized (messageTable) {
110                messageTable.clear();
111            }
112        }
113    
114        public void delete() {
115            synchronized (messageTable) {
116                messageTable.clear();
117            }
118        }
119    
120        
121        public int getMessageCount() {
122            return messageTable.size();
123        }
124    
125        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
126            synchronized (messageTable) {
127                boolean pastLackBatch = lastBatchId == null;
128                int count = 0;
129                for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
130                    Map.Entry entry = (Entry)iter.next();
131                    if (pastLackBatch) {
132                        count++;
133                        Object msg = entry.getValue();
134                        lastBatchId = (MessageId)entry.getKey();
135                        if (msg.getClass() == MessageId.class) {
136                            listener.recoverMessageReference((MessageId)msg);
137                        } else {
138                            listener.recoverMessage((Message)msg);
139                        }
140                    } else {
141                        pastLackBatch = entry.getKey().equals(lastBatchId);
142                    }
143                }
144            }
145        }
146    
147        public void resetBatching() {
148            lastBatchId = null;
149        }
150    
151        @Override
152        public void setBatch(MessageId messageId) {
153            lastBatchId = messageId;
154        }
155        
156    }