001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.Destination;
027    import org.apache.activemq.broker.region.IndirectMessageReference;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.QueueMessageReference;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032    import org.apache.activemq.openwire.OpenWireFormat;
033    import org.apache.activemq.store.PList;
034    import org.apache.activemq.store.PListStore;
035    import org.apache.activemq.store.PListEntry;
036    import org.apache.activemq.usage.SystemUsage;
037    import org.apache.activemq.usage.Usage;
038    import org.apache.activemq.usage.UsageListener;
039    import org.apache.activemq.wireformat.WireFormat;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    import org.apache.activemq.util.ByteSequence;
043    
044    /**
045     * persist pending messages pending message (messages awaiting dispatch to a
046     * consumer) cursor
047     * 
048     * 
049     */
050    public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
051        static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
052        private static final AtomicLong NAME_COUNT = new AtomicLong();
053        protected Broker broker;
054        private final PListStore store;
055        private final String name;
056        private PendingList memoryList;
057        private PList diskList;
058        private Iterator<MessageReference> iter;
059        private Destination regionDestination;
060        private boolean iterating;
061        private boolean flushRequired;
062        private final AtomicBoolean started = new AtomicBoolean();
063        private final WireFormat wireFormat = new OpenWireFormat();
064        /**
065         * @param broker
066         * @param name
067         * @param prioritizedMessages
068         */
069        public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
070            super(prioritizedMessages);
071            if (this.prioritizedMessages) {
072                this.memoryList = new PrioritizedPendingList();
073            } else {
074                this.memoryList = new OrderedPendingList();
075            }
076            this.broker = broker;
077            // the store can be null if the BrokerService has persistence
078            // turned off
079            this.store = broker.getTempDataStore();
080            this.name = NAME_COUNT.incrementAndGet() + "_" + name;
081        }
082    
083        @Override
084        public void start() throws Exception {
085            if (started.compareAndSet(false, true)) {
086                super.start();
087                if (systemUsage != null) {
088                    systemUsage.getMemoryUsage().addUsageListener(this);
089                }
090            }
091        }
092    
093        @Override
094        public void stop() throws Exception {
095            if (started.compareAndSet(true, false)) {
096                super.stop();
097                if (systemUsage != null) {
098                    systemUsage.getMemoryUsage().removeUsageListener(this);
099                }
100            }
101        }
102    
103        /**
104         * @return true if there are no pending messages
105         */
106        @Override
107        public synchronized boolean isEmpty() {
108            if (memoryList.isEmpty() && isDiskListEmpty()) {
109                return true;
110            }
111            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
112                MessageReference node = iterator.next();
113                if (node == QueueMessageReference.NULL_MESSAGE) {
114                    continue;
115                }
116                if (!node.isDropped()) {
117                    return false;
118                }
119                // We can remove dropped references.
120                iterator.remove();
121            }
122            return isDiskListEmpty();
123        }
124    
125        /**
126         * reset the cursor
127         */
128        @Override
129        public synchronized void reset() {
130            iterating = true;
131            last = null;
132            if (isDiskListEmpty()) {
133                this.iter = this.memoryList.iterator();
134            } else {
135                this.iter = new DiskIterator();
136            }
137        }
138    
139        @Override
140        public synchronized void release() {
141            iterating = false;
142            if (iter instanceof DiskIterator) {
143               ((DiskIterator)iter).release();
144            };
145            if (flushRequired) {
146                flushRequired = false;
147                if (!hasSpace()) {
148                    flushToDisk();
149                }
150            }
151            // ensure any memory ref is released
152            iter = null;
153        }
154    
155        @Override
156        public synchronized void destroy() throws Exception {
157            stop();
158            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
159                MessageReference node = i.next();
160                node.decrementReferenceCount();
161            }
162            memoryList.clear();
163            destroyDiskList();
164        }
165    
166        private void destroyDiskList() throws Exception {
167            if (diskList != null) {
168                store.removePList(name);
169                diskList = null;
170            }
171        }
172    
173        @Override
174        public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
175            LinkedList<MessageReference> result = new LinkedList<MessageReference>();
176            int count = 0;
177            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
178                MessageReference ref = i.next();
179                ref.incrementReferenceCount();
180                result.add(ref);
181                count++;
182            }
183            if (count < maxItems && !isDiskListEmpty()) {
184                for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
185                    Message message = (Message) i.next();
186                    message.setRegionDestination(regionDestination);
187                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
188                    message.incrementReferenceCount();
189                    result.add(message);
190                    count++;
191                }
192            }
193            return result;
194        }
195    
196        /**
197         * add message to await dispatch
198         * 
199         * @param node
200         * @throws Exception 
201         */
202        @Override
203        public synchronized void addMessageLast(MessageReference node) throws Exception {
204            tryAddMessageLast(node, 0);
205        }
206        
207        @Override
208        public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
209            if (!node.isExpired()) {
210                try {
211                    regionDestination = (Destination) node.getMessage().getRegionDestination();
212                    if (isDiskListEmpty()) {
213                        if (hasSpace() || this.store == null) {
214                            memoryList.addMessageLast(node);
215                            node.incrementReferenceCount();
216                            setCacheEnabled(true);
217                            return true;
218                        }
219                    }
220                    if (!hasSpace()) {
221                        if (isDiskListEmpty()) {
222                            expireOldMessages();
223                            if (hasSpace()) {
224                                memoryList.addMessageLast(node);
225                                node.incrementReferenceCount();
226                                return true;
227                            } else {
228                                flushToDisk();
229                            }
230                        }
231                    }
232                    if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
233                        ByteSequence bs = getByteSequence(node.getMessage());
234                        getDiskList().addLast(node.getMessageId().toString(), bs);
235                        return true;
236                    }
237                    return false;
238    
239                } catch (Exception e) {
240                    LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
241                    throw new RuntimeException(e);
242                }
243            } else {
244                discardExpiredMessage(node);
245            }
246            //message expired
247            return true;
248        }
249    
250        /**
251         * add message to await dispatch
252         * 
253         * @param node
254         */
255        @Override
256        public synchronized void addMessageFirst(MessageReference node) {
257            if (!node.isExpired()) {
258                try {
259                    regionDestination = (Destination) node.getMessage().getRegionDestination();
260                    if (isDiskListEmpty()) {
261                        if (hasSpace()) {
262                            memoryList.addMessageFirst(node);
263                            node.incrementReferenceCount();
264                            setCacheEnabled(true);
265                            return;
266                        }
267                    }
268                    if (!hasSpace()) {
269                        if (isDiskListEmpty()) {
270                            expireOldMessages();
271                            if (hasSpace()) {
272                                memoryList.addMessageFirst(node);
273                                node.incrementReferenceCount();
274                                return;
275                            } else {
276                                flushToDisk();
277                            }
278                        }
279                    }
280                    systemUsage.getTempUsage().waitForSpace();
281                    node.decrementReferenceCount();
282                    ByteSequence bs = getByteSequence(node.getMessage());
283                    Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
284                    node.getMessageId().setPlistLocator(locator);
285    
286                } catch (Exception e) {
287                    LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
288                    throw new RuntimeException(e);
289                }
290            } else {
291                discardExpiredMessage(node);
292            }
293        }
294    
295        /**
296         * @return true if there pending messages to dispatch
297         */
298        @Override
299        public synchronized boolean hasNext() {
300            return iter.hasNext();
301        }
302    
303        /**
304         * @return the next pending message
305         */
306        @Override
307        public synchronized MessageReference next() {
308            MessageReference reference = iter.next();
309            last = reference;
310            if (!isDiskListEmpty()) {
311                // got from disk
312                reference.getMessage().setRegionDestination(regionDestination);
313                reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
314            }
315            reference.incrementReferenceCount();
316            return reference;
317        }
318    
319        /**
320         * remove the message at the cursor position
321         */
322        @Override
323        public synchronized void remove() {
324            iter.remove();
325            if (last != null) {
326                last.decrementReferenceCount();
327            }
328        }
329    
330        /**
331         * @param node
332         * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
333         */
334        @Override
335        public synchronized void remove(MessageReference node) {
336            if (memoryList.remove(node) != null) {
337                node.decrementReferenceCount();
338            }
339            if (!isDiskListEmpty()) {
340                try {
341                    getDiskList().remove(node.getMessageId().getPlistLocator());
342                } catch (IOException e) {
343                    throw new RuntimeException(e);
344                }
345            }
346        }
347    
348        /**
349         * @return the number of pending messages
350         */
351        @Override
352        public synchronized int size() {
353            return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
354        }
355    
356        /**
357         * clear all pending messages
358         */
359        @Override
360        public synchronized void clear() {
361            memoryList.clear();
362            if (!isDiskListEmpty()) {
363                try {
364                    getDiskList().destroy();
365                } catch (IOException e) {
366                    throw new RuntimeException(e);
367                }
368            }
369            last = null;
370        }
371    
372        @Override
373        public synchronized boolean isFull() {
374    
375            return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
376    
377        }
378    
379        @Override
380        public boolean hasMessagesBufferedToDeliver() {
381            return !isEmpty();
382        }
383    
384        @Override
385        public void setSystemUsage(SystemUsage usageManager) {
386            super.setSystemUsage(usageManager);
387        }
388    
389        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
390            if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
391                synchronized (this) {
392                    if (!flushRequired && size() != 0) {
393                        flushRequired =true;
394                        if (!iterating) {
395                            expireOldMessages();
396                            if (!hasSpace()) {
397                                flushToDisk();
398                                flushRequired = false;
399                            }
400                        }
401                    }
402                }
403            }
404        }
405    
406        @Override
407        public boolean isTransient() {
408            return true;
409        }
410    
411        protected boolean isSpaceInMemoryList() {
412            return hasSpace() && isDiskListEmpty();
413        }
414    
415        protected synchronized void expireOldMessages() {
416            if (!memoryList.isEmpty()) {
417                for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
418                    MessageReference node = iterator.next();
419                    if (node.isExpired()) {
420                        node.decrementReferenceCount();
421                        discardExpiredMessage(node);
422                        iterator.remove();
423                    }
424                }
425            }
426        }
427    
428        protected synchronized void flushToDisk() {
429            if (!memoryList.isEmpty() && store != null) {
430                long start = 0;
431                 if (LOG.isTraceEnabled()) {
432                    start = System.currentTimeMillis();
433                    LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size()  + " " +  (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
434                 }
435                for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
436                    MessageReference node = iterator.next();
437                    node.decrementReferenceCount();
438                    ByteSequence bs;
439                    try {
440                        bs = getByteSequence(node.getMessage());
441                        getDiskList().addLast(node.getMessageId().toString(), bs);
442                    } catch (IOException e) {
443                        LOG.error("Failed to write to disk list", e);
444                        throw new RuntimeException(e);
445                    }
446    
447                }
448                memoryList.clear();
449                setCacheEnabled(false);
450                 if (LOG.isTraceEnabled()) {
451                    LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
452                 }
453            }
454        }
455    
456        protected boolean isDiskListEmpty() {
457            return diskList == null || diskList.isEmpty();
458        }
459    
460        public PList getDiskList() {
461            if (diskList == null) {
462                try {
463                    diskList = store.getPList(name);
464                } catch (Exception e) {
465                    LOG.error("Caught an IO Exception getting the DiskList " + name, e);
466                    throw new RuntimeException(e);
467                }
468            }
469            return diskList;
470        }
471    
472        private void discardExpiredMessage(MessageReference reference) {
473            if (LOG.isDebugEnabled()) {
474                LOG.debug("Discarding expired message " + reference);
475            }
476            if (broker.isExpired(reference)) {
477                ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
478                context.setBroker(broker);
479                ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
480            }
481        }
482    
483        protected ByteSequence getByteSequence(Message message) throws IOException {
484            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
485            return new ByteSequence(packet.data, packet.offset, packet.length);
486        }
487    
488        protected Message getMessage(ByteSequence bs) throws IOException {
489            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
490                    .getOffset(), bs.getLength());
491            return (Message) this.wireFormat.unmarshal(packet);
492    
493        }
494    
495        final class DiskIterator implements Iterator<MessageReference> {
496            private final PList.PListIterator iterator;
497            DiskIterator() {
498                try {
499                    iterator = getDiskList().iterator();
500                } catch (Exception e) {
501                    throw new RuntimeException(e);
502                }
503            }
504    
505            public boolean hasNext() {
506                return iterator.hasNext();
507            }
508    
509            public MessageReference next() {
510                try {
511                    PListEntry entry = iterator.next();
512                    Message message = getMessage(entry.getByteSequence());
513                    message.getMessageId().setPlistLocator(entry.getLocator());
514                    return message;
515                } catch (IOException e) {
516                    LOG.error("I/O error", e);
517                    throw new RuntimeException(e);
518                }
519            }
520    
521            public void remove() {
522                iterator.remove();
523            }
524    
525            public void release() {
526                iterator.release();
527            }
528        }
529    }