001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import org.apache.activemq.broker.ConnectionContext;
021    import org.apache.activemq.command.ActiveMQDestination;
022    import org.apache.activemq.command.Message;
023    import org.apache.activemq.command.MessageAck;
024    import org.apache.activemq.command.MessageId;
025    import org.apache.activemq.kaha.MapContainer;
026    import org.apache.activemq.kaha.StoreEntry;
027    import org.apache.activemq.store.MessageRecoveryListener;
028    import org.apache.activemq.store.AbstractMessageStore;
029    
030    /**
031     * An implementation of {@link org.apache.activemq.store.MessageStore} which
032     * uses a JPS Container
033     * 
034     * 
035     */
036    public class KahaMessageStore extends AbstractMessageStore {
037    
038        protected final MapContainer<MessageId, Message> messageContainer;
039        protected StoreEntry batchEntry;
040    
041        public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
042            throws IOException {
043            super(destination);
044            this.messageContainer = container;
045        }
046    
047        protected MessageId getMessageId(Object object) {
048            return ((Message)object).getMessageId();
049        }
050    
051        public Object getId() {
052            return messageContainer.getId();
053        }
054    
055        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
056            messageContainer.put(message.getMessageId(), message);
057            // TODO: we should do the following but it is not need if the message is
058            // being added within a persistence
059            // transaction
060            // but since I can't tell if one is running right now.. I'll leave this
061            // out for now.
062            // if( message.isResponseRequired() ) {
063            // messageContainer.force();
064            // }
065        }
066    
067        public synchronized Message getMessage(MessageId identity) throws IOException {
068            Message result = messageContainer.get(identity);
069            return result;
070        }
071    
072        protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
073            listener.recoverMessage(msg);
074            return listener.hasSpace();
075        }
076    
077        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
078            removeMessage(ack.getLastMessageId());
079        }
080    
081        public synchronized void removeMessage(MessageId msgId) throws IOException {
082            StoreEntry entry = messageContainer.getEntry(msgId);
083            if (entry != null) {
084                messageContainer.remove(entry);
085                if (messageContainer.isEmpty() || (batchEntry != null && batchEntry.equals(entry))) {
086                    resetBatching();
087                }
088            }
089        }
090    
091        public synchronized void recover(MessageRecoveryListener listener) throws Exception {
092            for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
093                .getNext(entry)) {
094                Message msg = (Message)messageContainer.getValue(entry);
095                if (!recoverMessage(listener, msg)) {
096                    break;
097                }
098            }
099        }
100    
101        public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
102            messageContainer.clear();
103        }
104    
105        public synchronized void delete() {
106            messageContainer.clear();
107        }
108    
109        /**
110         * @return the number of messages held by this destination
111         * @see org.apache.activemq.store.MessageStore#getMessageCount()
112         */
113        public int getMessageCount() {
114            return messageContainer.size();
115        }
116    
117        /**
118         * @param id
119         * @return null
120         * @throws Exception
121         * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
122         */
123        public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception {
124            return null;
125        }
126    
127        /**
128         * @param lastMessageId
129         * @param maxReturned
130         * @param listener
131         * @throws Exception
132         * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId,
133         *      int, org.apache.activemq.store.MessageRecoveryListener)
134         */
135        public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
136            throws Exception {
137            StoreEntry entry = batchEntry;
138            if (entry == null) {
139                entry = messageContainer.getFirst();
140            } else {
141                entry = messageContainer.refresh(entry);
142                entry = messageContainer.getNext(entry);
143                if (entry == null) {
144                    batchEntry = null;
145                }
146            }
147            if (entry != null) {
148                int count = 0;
149                do {
150                    Message msg = messageContainer.getValue(entry);
151                    if (msg != null) {
152                        recoverMessage(listener, msg);
153                        count++;
154                    }
155                    batchEntry = entry;
156                    entry = messageContainer.getNext(entry);
157                } while (entry != null && count < maxReturned && listener.hasSpace());
158            }
159        }
160    
161        /**
162         * @param nextToDispatch
163         * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
164         */
165        public synchronized void resetBatching() {
166            batchEntry = null;
167        }
168    
169        /**
170         * @return true if the store supports cursors
171         */
172        public boolean isSupportForCursors() {
173            return true;
174        }
175    
176        @Override
177        public void setBatch(MessageId messageId) {
178            batchEntry = messageContainer.getEntry(messageId);
179        }
180        
181    }