001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.HashSet;
021    import java.util.Set;
022    import java.util.concurrent.locks.Lock;
023    import java.util.concurrent.locks.ReentrantLock;
024    
025    import org.apache.activemq.ActiveMQMessageAudit;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.kaha.MapContainer;
032    import org.apache.activemq.kaha.MessageAckWithLocation;
033    import org.apache.activemq.kaha.StoreEntry;
034    import org.apache.activemq.store.AbstractMessageStore;
035    import org.apache.activemq.store.MessageRecoveryListener;
036    import org.apache.activemq.store.ReferenceStore;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * @author rajdavies
042     *
043     */
044    public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStore.class);
047        protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
048        protected KahaReferenceStoreAdapter adapter;
049        // keep track of dispatched messages so that duplicate sends that follow a successful
050        // dispatch can be suppressed.
051        protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit();
052        private StoreEntry batchEntry;
053        private String lastBatchId;
054        protected final Lock lock = new ReentrantLock();
055    
056        public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
057                                  ActiveMQDestination destination) throws IOException {
058            super(destination);
059            this.adapter = adapter;
060            this.messageContainer = container;
061        }
062        
063        public Lock getStoreLock() {
064            return lock;
065        }
066    
067        public void dispose(ConnectionContext context) {
068            super.dispose(context);
069            this.messageContainer.delete();
070            this.adapter.removeReferenceStore(this);
071        }
072    
073        protected MessageId getMessageId(Object object) {
074            return new MessageId(((ReferenceRecord)object).getMessageId());
075        }
076    
077        public void addMessage(ConnectionContext context, Message message) throws IOException {
078            throw new RuntimeException("Use addMessageReference instead");
079        }
080    
081        public Message getMessage(MessageId identity) throws IOException {
082            throw new RuntimeException("Use addMessageReference instead");
083        }
084    
085        protected final boolean recoverReference(MessageRecoveryListener listener,
086                ReferenceRecord record) throws Exception {
087            MessageId id = new MessageId(record.getMessageId());
088            if (listener.hasSpace()) {
089                return listener.recoverMessageReference(id);
090            }
091            return false;
092        }
093    
094        public void recover(MessageRecoveryListener listener) throws Exception {
095            lock.lock();
096            try {
097                for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
098                    .getNext(entry)) {
099                    ReferenceRecord record = messageContainer.getValue(entry);
100                    if (!recoverReference(listener, record)) {
101                        break;
102                    }
103                }
104            }finally {
105                lock.unlock();
106            }
107        }
108    
109        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
110            throws Exception {
111            lock.lock();
112            try {
113                StoreEntry entry = batchEntry;
114                if (entry == null) {
115                    entry = messageContainer.getFirst();
116                } else {
117                    entry = messageContainer.refresh(entry);
118                    if (entry != null) {
119                        entry = messageContainer.getNext(entry);
120                    }
121                }
122                if (entry != null) {      
123                    int count = 0;
124                    do {
125                        ReferenceRecord msg = messageContainer.getValue(entry);
126                        if (msg != null ) {
127                            if (recoverReference(listener, msg)) {
128                                count++;
129                                lastBatchId = msg.getMessageId();
130                            } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
131                                if (LOG.isDebugEnabled()) {
132                                    LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId());
133                                }
134                                // give usage limits a chance to reclaim
135                                break;
136                            } else {
137                                // skip duplicate and continue
138                                if (LOG.isDebugEnabled()) {
139                                    LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId());
140                                }
141                            }                        
142                        } else {
143                            lastBatchId = null;
144                        }
145                        batchEntry = entry;
146                        entry = messageContainer.getNext(entry);
147                    } while (entry != null && count < maxReturned && listener.hasSpace());
148                }
149            }finally {
150                lock.unlock();
151            }
152        }
153    
154        public boolean addMessageReference(ConnectionContext context, MessageId messageId,
155                                                     ReferenceData data) throws IOException {
156            
157            boolean uniqueueReferenceAdded = false;
158            lock.lock();
159            try {
160                if (!isDuplicate(messageId)) {
161                    ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
162                    messageContainer.put(messageId, record);
163                    uniqueueReferenceAdded = true;
164                    addInterest(record);
165                    if (LOG.isDebugEnabled()) {
166                        LOG.debug(destination.getPhysicalName() + " add: " + messageId);
167                    }
168                }
169            } finally {
170                lock.unlock();
171            }
172            return uniqueueReferenceAdded;
173        }
174    
175        protected boolean isDuplicate(final MessageId messageId) {
176            boolean duplicate = messageContainer.containsKey(messageId);
177            if (!duplicate) {
178                duplicate = dispatchAudit.isDuplicate(messageId);
179                if (duplicate) {
180                    if (LOG.isDebugEnabled()) {
181                        LOG.debug(destination.getPhysicalName()
182                            + " ignoring duplicated (add) message reference, already dispatched: "
183                            + messageId);
184                    }
185                }
186            } else if (LOG.isDebugEnabled()) {
187                LOG.debug(destination.getPhysicalName()
188                        + " ignoring duplicated (add) message reference, already in store: " + messageId);
189            }
190            return duplicate;
191        }
192        
193        public ReferenceData getMessageReference(MessageId identity) throws IOException {
194            lock.lock();
195            try {
196                ReferenceRecord result = messageContainer.get(identity);
197                if (result == null) {
198                    return null;
199                }
200                return result.getData();
201            }finally {
202                lock.unlock();
203            }
204        }
205    
206        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
207            lock.lock();
208            try {
209                MessageId msgId = ack.getLastMessageId();
210                StoreEntry entry = messageContainer.getEntry(msgId);
211                if (entry != null) {
212                    ReferenceRecord rr = messageContainer.remove(msgId);
213                    if (rr != null) {
214                        removeInterest(rr);
215                        if (ack instanceof MessageAckWithLocation) {
216                            recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId());
217                        }
218                        dispatchAudit.isDuplicate(msgId);
219                        if (LOG.isDebugEnabled()) {
220                            LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
221                        }
222                        if (messageContainer.isEmpty()
223                            || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
224                            || (batchEntry != null && batchEntry.equals(entry))) {
225                            resetBatching();
226                        }
227                    }
228                }
229            }finally {
230                lock.unlock();
231            }
232        }
233    
234        private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) {
235            adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId);
236        }
237    
238        public void removeAllMessages(ConnectionContext context) throws IOException {
239            lock.lock();
240            try {
241                Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
242                MessageAck ack = new MessageAck();
243                for (MessageId id:tmpSet) {
244                    ack.setLastMessageId(id);
245                    removeMessage(null, ack);
246                }
247                resetBatching();
248                messageContainer.clear();
249            }finally {
250                lock.unlock();
251            }
252        }
253    
254        public void delete() {
255            lock.lock();
256            try {
257                messageContainer.clear();
258            }finally {
259                lock.unlock();
260            }
261        }
262    
263        public void resetBatching() {
264            lock.lock();
265            try {
266                batchEntry = null;
267                lastBatchId = null;
268            }finally {
269                lock.unlock();
270            }
271        }
272    
273        public int getMessageCount() {
274            return messageContainer.size();
275        }
276    
277        public boolean isSupportForCursors() {
278            return true;
279        }
280    
281        public boolean supportsExternalBatchControl() {
282            return true;
283        }
284    
285        void removeInterest(ReferenceRecord rr) {
286            adapter.removeInterestInRecordFile(rr.getData().getFileId());
287        }
288    
289        void addInterest(ReferenceRecord rr) {
290            adapter.addInterestInRecordFile(rr.getData().getFileId());
291        }
292    
293        /**
294         * @param startAfter
295         * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
296         */
297        public void setBatch(MessageId startAfter) {
298            lock.lock();
299            try {
300                batchEntry = messageContainer.getEntry(startAfter);
301                if (LOG.isDebugEnabled()) {
302                    LOG.debug("setBatch: " + startAfter);
303                }
304            } finally {
305                lock.unlock();
306            }
307        }
308    }