001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import org.apache.activemq.broker.ConnectionContext;
021    import org.apache.activemq.command.ActiveMQDestination;
022    import org.apache.activemq.command.Message;
023    import org.apache.activemq.command.MessageAck;
024    import org.apache.activemq.command.MessageId;
025    import org.apache.activemq.kaha.MapContainer;
026    import org.apache.activemq.kaha.StoreEntry;
027    import org.apache.activemq.store.MessageRecoveryListener;
028    import org.apache.activemq.store.MessageStore;
029    import org.apache.activemq.store.AbstractMessageStore;
030    import org.apache.activemq.usage.MemoryUsage;
031    import org.apache.activemq.usage.SystemUsage;
032    
033    /**
034     * An implementation of {@link org.apache.activemq.store.MessageStore} which
035     * uses a JPS Container
036     * 
037     * 
038     */
039    public class KahaMessageStore extends AbstractMessageStore {
040    
041        protected final MapContainer<MessageId, Message> messageContainer;
042        protected StoreEntry batchEntry;
043    
044        public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
045            throws IOException {
046            super(destination);
047            this.messageContainer = container;
048        }
049    
050        protected MessageId getMessageId(Object object) {
051            return ((Message)object).getMessageId();
052        }
053    
054        public Object getId() {
055            return messageContainer.getId();
056        }
057    
058        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
059            messageContainer.put(message.getMessageId(), message);
060            // TODO: we should do the following but it is not need if the message is
061            // being added within a persistence
062            // transaction
063            // but since I can't tell if one is running right now.. I'll leave this
064            // out for now.
065            // if( message.isResponseRequired() ) {
066            // messageContainer.force();
067            // }
068        }
069    
070        public synchronized Message getMessage(MessageId identity) throws IOException {
071            Message result = messageContainer.get(identity);
072            return result;
073        }
074    
075        protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
076            listener.recoverMessage(msg);
077            return listener.hasSpace();
078        }
079    
080        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
081            removeMessage(ack.getLastMessageId());
082        }
083    
084        public synchronized void removeMessage(MessageId msgId) throws IOException {
085            StoreEntry entry = messageContainer.getEntry(msgId);
086            if (entry != null) {
087                messageContainer.remove(entry);
088                if (messageContainer.isEmpty() || (batchEntry != null && batchEntry.equals(entry))) {
089                    resetBatching();
090                }
091            }
092        }
093    
094        public synchronized void recover(MessageRecoveryListener listener) throws Exception {
095            for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
096                .getNext(entry)) {
097                Message msg = (Message)messageContainer.getValue(entry);
098                if (!recoverMessage(listener, msg)) {
099                    break;
100                }
101            }
102        }
103    
104        public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
105            messageContainer.clear();
106        }
107    
108        public synchronized void delete() {
109            messageContainer.clear();
110        }
111    
112        /**
113         * @return the number of messages held by this destination
114         * @see org.apache.activemq.store.MessageStore#getMessageCount()
115         */
116        public int getMessageCount() {
117            return messageContainer.size();
118        }
119    
120        /**
121         * @param id
122         * @return null
123         * @throws Exception
124         * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
125         */
126        public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception {
127            return null;
128        }
129    
130        /**
131         * @param lastMessageId
132         * @param maxReturned
133         * @param listener
134         * @throws Exception
135         * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId,
136         *      int, org.apache.activemq.store.MessageRecoveryListener)
137         */
138        public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
139            throws Exception {
140            StoreEntry entry = batchEntry;
141            if (entry == null) {
142                entry = messageContainer.getFirst();
143            } else {
144                entry = messageContainer.refresh(entry);
145                entry = messageContainer.getNext(entry);
146                if (entry == null) {
147                    batchEntry = null;
148                }
149            }
150            if (entry != null) {
151                int count = 0;
152                do {
153                    Message msg = messageContainer.getValue(entry);
154                    if (msg != null) {
155                        recoverMessage(listener, msg);
156                        count++;
157                    }
158                    batchEntry = entry;
159                    entry = messageContainer.getNext(entry);
160                } while (entry != null && count < maxReturned && listener.hasSpace());
161            }
162        }
163    
164        /**
165         * @param nextToDispatch
166         * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
167         */
168        public synchronized void resetBatching() {
169            batchEntry = null;
170        }
171    
172        /**
173         * @return true if the store supports cursors
174         */
175        public boolean isSupportForCursors() {
176            return true;
177        }
178    
179        @Override
180        public void setBatch(MessageId messageId) {
181            batchEntry = messageContainer.getEntry(messageId);
182        }
183        
184    }