001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.Destination;
027    import org.apache.activemq.broker.region.IndirectMessageReference;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.QueueMessageReference;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032    import org.apache.activemq.openwire.OpenWireFormat;
033    import org.apache.activemq.store.kahadb.plist.PList;
034    import org.apache.activemq.store.kahadb.plist.PListEntry;
035    import org.apache.activemq.store.kahadb.plist.PListStore;
036    import org.apache.activemq.usage.SystemUsage;
037    import org.apache.activemq.usage.Usage;
038    import org.apache.activemq.usage.UsageListener;
039    import org.apache.activemq.wireformat.WireFormat;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    import org.apache.kahadb.util.ByteSequence;
043    
044    /**
045     * persist pending messages pending message (messages awaiting dispatch to a
046     * consumer) cursor
047     * 
048     * 
049     */
050    public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
051        static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
052        private static final AtomicLong NAME_COUNT = new AtomicLong();
053        protected Broker broker;
054        private final PListStore store;
055        private final String name;
056        private PendingList memoryList;
057        private PList diskList;
058        private Iterator<MessageReference> iter;
059        private Destination regionDestination;
060        private boolean iterating;
061        private boolean flushRequired;
062        private final AtomicBoolean started = new AtomicBoolean();
063        private final WireFormat wireFormat = new OpenWireFormat();
064        /**
065         * @param broker
066         * @param name
067         * @param prioritizedMessages
068         */
069        public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
070            super(prioritizedMessages);
071            if (this.prioritizedMessages) {
072                this.memoryList = new PrioritizedPendingList();
073            } else {
074                this.memoryList = new OrderedPendingList();
075            }
076            this.broker = broker;
077            // the store can be null if the BrokerService has persistence
078            // turned off
079            this.store = broker.getTempDataStore();
080            this.name = NAME_COUNT.incrementAndGet() + "_" + name;
081        }
082    
083        @Override
084        public void start() throws Exception {
085            if (started.compareAndSet(false, true)) {
086                super.start();
087                if (systemUsage != null) {
088                    systemUsage.getMemoryUsage().addUsageListener(this);
089                }
090            }
091        }
092    
093        @Override
094        public void stop() throws Exception {
095            if (started.compareAndSet(true, false)) {
096                super.stop();
097                if (systemUsage != null) {
098                    systemUsage.getMemoryUsage().removeUsageListener(this);
099                }
100            }
101        }
102    
103        /**
104         * @return true if there are no pending messages
105         */
106        @Override
107        public synchronized boolean isEmpty() {
108            if (memoryList.isEmpty() && isDiskListEmpty()) {
109                return true;
110            }
111            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
112                MessageReference node = iterator.next();
113                if (node == QueueMessageReference.NULL_MESSAGE) {
114                    continue;
115                }
116                if (!node.isDropped()) {
117                    return false;
118                }
119                // We can remove dropped references.
120                iterator.remove();
121            }
122            return isDiskListEmpty();
123        }
124    
125        /**
126         * reset the cursor
127         */
128        @Override
129        public synchronized void reset() {
130            iterating = true;
131            last = null;
132            if (isDiskListEmpty()) {
133                this.iter = this.memoryList.iterator();
134            } else {
135                this.iter = new DiskIterator();
136            }
137        }
138    
139        @Override
140        public synchronized void release() {
141            iterating = false;
142            if (iter instanceof DiskIterator) {
143               ((DiskIterator)iter).release();
144            };
145            if (flushRequired) {
146                flushRequired = false;
147                if (!hasSpace()) {
148                    flushToDisk();
149                }
150            }
151        }
152    
153        @Override
154        public synchronized void destroy() throws Exception {
155            stop();
156            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
157                Message node = (Message) i.next();
158                node.decrementReferenceCount();
159            }
160            memoryList.clear();
161            destroyDiskList();
162        }
163    
164        private void destroyDiskList() throws Exception {
165            if (diskList != null) {
166                store.removePList(name);
167                diskList = null;
168            }
169        }
170    
171        @Override
172        public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
173            LinkedList<MessageReference> result = new LinkedList<MessageReference>();
174            int count = 0;
175            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
176                MessageReference ref = i.next();
177                ref.incrementReferenceCount();
178                result.add(ref);
179                count++;
180            }
181            if (count < maxItems && !isDiskListEmpty()) {
182                for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
183                    Message message = (Message) i.next();
184                    message.setRegionDestination(regionDestination);
185                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
186                    message.incrementReferenceCount();
187                    result.add(message);
188                    count++;
189                }
190            }
191            return result;
192        }
193    
194        /**
195         * add message to await dispatch
196         * 
197         * @param node
198         * @throws Exception 
199         */
200        @Override
201        public synchronized void addMessageLast(MessageReference node) throws Exception {
202            tryAddMessageLast(node, 0);
203        }
204        
205        @Override
206        public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
207            if (!node.isExpired()) {
208                try {
209                    regionDestination = node.getMessage().getRegionDestination();
210                    if (isDiskListEmpty()) {
211                        if (hasSpace() || this.store == null) {
212                            memoryList.addMessageLast(node);
213                            node.incrementReferenceCount();
214                            setCacheEnabled(true);
215                            return true;
216                        }
217                    }
218                    if (!hasSpace()) {
219                        if (isDiskListEmpty()) {
220                            expireOldMessages();
221                            if (hasSpace()) {
222                                memoryList.addMessageLast(node);
223                                node.incrementReferenceCount();
224                                return true;
225                            } else {
226                                flushToDisk();
227                            }
228                        }
229                    }
230                    if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
231                        ByteSequence bs = getByteSequence(node.getMessage());
232                        getDiskList().addLast(node.getMessageId().toString(), bs);
233                        return true;
234                    }
235                    return false;
236    
237                } catch (Exception e) {
238                    LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
239                    throw new RuntimeException(e);
240                }
241            } else {
242                discardExpiredMessage(node);
243            }
244            //message expired
245            return true;
246        }
247    
248        /**
249         * add message to await dispatch
250         * 
251         * @param node
252         */
253        @Override
254        public synchronized void addMessageFirst(MessageReference node) {
255            if (!node.isExpired()) {
256                try {
257                    regionDestination = node.getMessage().getRegionDestination();
258                    if (isDiskListEmpty()) {
259                        if (hasSpace()) {
260                            memoryList.addMessageFirst(node);
261                            node.incrementReferenceCount();
262                            setCacheEnabled(true);
263                            return;
264                        }
265                    }
266                    if (!hasSpace()) {
267                        if (isDiskListEmpty()) {
268                            expireOldMessages();
269                            if (hasSpace()) {
270                                memoryList.addMessageFirst(node);
271                                node.incrementReferenceCount();
272                                return;
273                            } else {
274                                flushToDisk();
275                            }
276                        }
277                    }
278                    systemUsage.getTempUsage().waitForSpace();
279                    node.decrementReferenceCount();
280                    ByteSequence bs = getByteSequence(node.getMessage());
281                    getDiskList().addFirst(node.getMessageId().toString(), bs);
282    
283                } catch (Exception e) {
284                    LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
285                    throw new RuntimeException(e);
286                }
287            } else {
288                discardExpiredMessage(node);
289            }
290        }
291    
292        /**
293         * @return true if there pending messages to dispatch
294         */
295        @Override
296        public synchronized boolean hasNext() {
297            return iter.hasNext();
298        }
299    
300        /**
301         * @return the next pending message
302         */
303        @Override
304        public synchronized MessageReference next() {
305            MessageReference reference = iter.next();
306            last = reference;
307            if (!isDiskListEmpty()) {
308                // got from disk
309                reference.getMessage().setRegionDestination(regionDestination);
310                reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
311            }
312            reference.incrementReferenceCount();
313            return reference;
314        }
315    
316        /**
317         * remove the message at the cursor position
318         */
319        @Override
320        public synchronized void remove() {
321            iter.remove();
322            if (last != null) {
323                last.decrementReferenceCount();
324            }
325        }
326    
327        /**
328         * @param node
329         * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
330         */
331        @Override
332        public synchronized void remove(MessageReference node) {
333            if (memoryList.remove(node) != null) {
334                node.decrementReferenceCount();
335            }
336            if (!isDiskListEmpty()) {
337                try {
338                    getDiskList().remove(node.getMessageId().toString());
339                } catch (IOException e) {
340                    throw new RuntimeException(e);
341                }
342            }
343        }
344    
345        /**
346         * @return the number of pending messages
347         */
348        @Override
349        public synchronized int size() {
350            return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
351        }
352    
353        /**
354         * clear all pending messages
355         */
356        @Override
357        public synchronized void clear() {
358            memoryList.clear();
359            if (!isDiskListEmpty()) {
360                try {
361                    getDiskList().destroy();
362                } catch (IOException e) {
363                    throw new RuntimeException(e);
364                }
365            }
366            last = null;
367        }
368    
369        @Override
370        public synchronized boolean isFull() {
371    
372            return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
373    
374        }
375    
376        @Override
377        public boolean hasMessagesBufferedToDeliver() {
378            return !isEmpty();
379        }
380    
381        @Override
382        public void setSystemUsage(SystemUsage usageManager) {
383            super.setSystemUsage(usageManager);
384        }
385    
386        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
387            if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
388                synchronized (this) {
389                    if (!flushRequired && size() != 0) {
390                        flushRequired =true;
391                        if (!iterating) {
392                            expireOldMessages();
393                            if (!hasSpace()) {
394                                flushToDisk();
395                                flushRequired = false;
396                            }
397                        }
398                    }
399                }
400            }
401        }
402    
403        @Override
404        public boolean isTransient() {
405            return true;
406        }
407    
408        protected boolean isSpaceInMemoryList() {
409            return hasSpace() && isDiskListEmpty();
410        }
411    
412        protected synchronized void expireOldMessages() {
413            if (!memoryList.isEmpty()) {
414                for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
415                    MessageReference node = iterator.next();
416                    if (node.isExpired()) {
417                        node.decrementReferenceCount();
418                        discardExpiredMessage(node);
419                        iterator.remove();
420                    }
421                }
422            }
423        }
424    
425        protected synchronized void flushToDisk() {
426            if (!memoryList.isEmpty() && store != null) {
427                long start = 0;
428                 if (LOG.isTraceEnabled()) {
429                    start = System.currentTimeMillis();
430                    LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size()  + " " +  (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
431                 }
432                for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
433                    MessageReference node = iterator.next();
434                    node.decrementReferenceCount();
435                    ByteSequence bs;
436                    try {
437                        bs = getByteSequence(node.getMessage());
438                        getDiskList().addLast(node.getMessageId().toString(), bs);
439                    } catch (IOException e) {
440                        LOG.error("Failed to write to disk list", e);
441                        throw new RuntimeException(e);
442                    }
443    
444                }
445                memoryList.clear();
446                setCacheEnabled(false);
447                 if (LOG.isTraceEnabled()) {
448                    LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
449                 }
450            }
451        }
452    
453        protected boolean isDiskListEmpty() {
454            return diskList == null || diskList.isEmpty();
455        }
456    
457        protected PList getDiskList() {
458            if (diskList == null) {
459                try {
460                    diskList = store.getPList(name);
461                } catch (Exception e) {
462                    LOG.error("Caught an IO Exception getting the DiskList " + name, e);
463                    throw new RuntimeException(e);
464                }
465            }
466            return diskList;
467        }
468    
469        private void discardExpiredMessage(MessageReference reference) {
470            if (LOG.isDebugEnabled()) {
471                LOG.debug("Discarding expired message " + reference);
472            }
473            if (broker.isExpired(reference)) {
474                ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
475                context.setBroker(broker);
476                reference.getRegionDestination().messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
477            }
478        }
479    
480        protected ByteSequence getByteSequence(Message message) throws IOException {
481            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
482            return new ByteSequence(packet.data, packet.offset, packet.length);
483        }
484    
485        protected Message getMessage(ByteSequence bs) throws IOException {
486            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
487                    .getOffset(), bs.getLength());
488            return (Message) this.wireFormat.unmarshal(packet);
489    
490        }
491    
492        final class DiskIterator implements Iterator<MessageReference> {
493            private final PList.PListIterator iterator;
494            DiskIterator() {
495                try {
496                    iterator = getDiskList().iterator();
497                } catch (Exception e) {
498                    throw new RuntimeException(e);
499                }
500            }
501    
502            public boolean hasNext() {
503                return iterator.hasNext();
504            }
505    
506            public MessageReference next() {
507                try {
508                    PListEntry entry = iterator.next();
509                    return getMessage(entry.getByteSequence());
510                } catch (IOException e) {
511                    LOG.error("I/O error", e);
512                    throw new RuntimeException(e);
513                }
514            }
515    
516            public void remove() {
517                iterator.remove();
518            }
519    
520            public void release() {
521                iterator.release();
522            }
523        }
524    }