001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.util;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.NoSuchElementException;
026    
027    /**
028     * Keeps track of a added long values. Collapses ranges of numbers using a
029     * Sequence representation. Use to keep track of received message ids to find
030     * out if a message is duplicate or if there are any missing messages.
031     *
032     * @author chirino
033     */
034    public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
035    
036        public static class Marshaller implements org.apache.activemq.store.kahadb.disk.util.Marshaller<SequenceSet> {
037    
038            public static final Marshaller INSTANCE = new Marshaller();
039    
040            public SequenceSet readPayload(DataInput in) throws IOException {
041                SequenceSet value = new SequenceSet();
042                int count = in.readInt();
043                for (int i = 0; i < count; i++) {
044                    if( in.readBoolean() ) {
045                        Sequence sequence = new Sequence(in.readLong(), in.readLong());
046                        value.addLast(sequence);
047                    } else {
048                        Sequence sequence = new Sequence(in.readLong());
049                        value.addLast(sequence);
050                    }
051                }
052                return value;
053            }
054    
055            public void writePayload(SequenceSet value, DataOutput out) throws IOException {
056                out.writeInt(value.size());
057                Sequence sequence = value.getHead();
058                while (sequence != null ) {
059                    if( sequence.range() > 1 ) {
060                        out.writeBoolean(true);
061                        out.writeLong(sequence.first);
062                        out.writeLong(sequence.last);
063                    } else {
064                        out.writeBoolean(false);
065                        out.writeLong(sequence.first);
066                    }
067                    sequence = sequence.getNext();
068                }
069            }
070    
071            public int getFixedSize() {
072                return -1;
073            }
074    
075            public SequenceSet deepCopy(SequenceSet value) {
076                SequenceSet rc = new SequenceSet();
077                Sequence sequence = value.getHead();
078                while (sequence != null ) {
079                    rc.add(new Sequence(sequence.first, sequence.last));
080                    sequence = sequence.getNext();
081                }
082                return rc;
083            }
084    
085            public boolean isDeepCopySupported() {
086                return true;
087            }
088        }
089    
090        public void add(Sequence value) {
091            // TODO we can probably optimize this a bit
092            for(long i=value.first; i<value.last+1; i++) {
093                add(i);
094            }
095        }
096    
097        /**
098         *
099         * @param value
100         *            the value to add to the list
101         * @return false if the value was a duplicate.
102         */
103        public boolean add(long value) {
104    
105            if (isEmpty()) {
106                addFirst(new Sequence(value));
107                return true;
108            }
109    
110            // check for append
111            Sequence sequence = getTail();
112            if (sequence.isAdjacentToLast(value)) {
113                sequence.last = value;
114                return true;
115            }
116    
117            sequence = getHead();
118            while (sequence != null) {
119    
120                if (sequence.isAdjacentToLast(value)) {
121                    // grow the sequence...
122                    sequence.last = value;
123                    // it might connect us to the next sequence..
124                    if (sequence.getNext() != null) {
125                        Sequence next = sequence.getNext();
126                        if (next.isAdjacentToFirst(value)) {
127                            // Yep the sequence connected.. so join them.
128                            sequence.last = next.last;
129                            next.unlink();
130                        }
131                    }
132                    return true;
133                }
134    
135                if (sequence.isAdjacentToFirst(value)) {
136                    // grow the sequence...
137                    sequence.first = value;
138    
139                    // it might connect us to the previous
140                    if (sequence.getPrevious() != null) {
141                        Sequence prev = sequence.getPrevious();
142                        if (prev.isAdjacentToLast(value)) {
143                            // Yep the sequence connected.. so join them.
144                            sequence.first = prev.first;
145                            prev.unlink();
146                        }
147                    }
148                    return true;
149                }
150    
151                // Did that value land before this sequence?
152                if (value < sequence.first) {
153                    // Then insert a new entry before this sequence item.
154                    sequence.linkBefore(new Sequence(value));
155                    return true;
156                }
157    
158                // Did that value land within the sequence? The it's a duplicate.
159                if (sequence.contains(value)) {
160                    return false;
161                }
162    
163                sequence = sequence.getNext();
164            }
165    
166            // Then the value is getting appended to the tail of the sequence.
167            addLast(new Sequence(value));
168            return true;
169        }
170    
171        /**
172         * Removes the given value from the Sequence set, splitting a
173         * contained sequence if necessary.
174         *
175         * @param value
176         *          The value that should be removed from the SequenceSet.
177         *
178         * @return true if the value was removed from the set, false if there
179         *         was no sequence in the set that contained the given value.
180         */
181        public boolean remove(long value) {
182            Sequence sequence = getHead();
183            while (sequence != null ) {
184                if(sequence.contains(value)) {
185                    if (sequence.range() == 1) {
186                        sequence.unlink();
187                        return true;
188                    } else if (sequence.getFirst() == value) {
189                        sequence.setFirst(value+1);
190                        return true;
191                    } else if (sequence.getLast() == value) {
192                        sequence.setLast(value-1);
193                        return true;
194                    } else {
195                        sequence.linkBefore(new Sequence(sequence.first, value-1));
196                        sequence.linkAfter(new Sequence(value+1, sequence.last));
197                        sequence.unlink();
198                        return true;
199                    }
200                }
201    
202                sequence = sequence.getNext();
203            }
204    
205            return false;
206        }
207    
208        /**
209         * Removes and returns the first element from this list.
210         *
211         * @return the first element from this list.
212         * @throws NoSuchElementException if this list is empty.
213         */
214        public long removeFirst() {
215            if (isEmpty()) {
216                throw new NoSuchElementException();
217            }
218    
219            Sequence rc = removeFirstSequence(1);
220            return rc.first;
221        }
222    
223        /**
224         * Removes and returns the last sequence from this list.
225         *
226         * @return the last sequence from this list or null if the list is empty.
227         */
228        public Sequence removeLastSequence() {
229            if (isEmpty()) {
230                return null;
231            }
232    
233            Sequence rc = getTail();
234            rc.unlink();
235            return rc;
236        }
237    
238        /**
239         * Removes and returns the first sequence that is count range large.
240         *
241         * @return a sequence that is count range large, or null if no sequence is that large in the list.
242         */
243        public Sequence removeFirstSequence(long count) {
244            if (isEmpty()) {
245                return null;
246            }
247    
248            Sequence sequence = getHead();
249            while (sequence != null ) {
250                if (sequence.range() == count ) {
251                    sequence.unlink();
252                    return sequence;
253                }
254                if (sequence.range() > count ) {
255                    Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
256                    sequence.first+=count;
257                    return rc;
258                }
259                sequence = sequence.getNext();
260            }
261            return null;
262        }
263    
264        /**
265         * @return all the id Sequences that are missing from this set that are not
266         *         in between the range provided.
267         */
268        public List<Sequence> getMissing(long first, long last) {
269            ArrayList<Sequence> rc = new ArrayList<Sequence>();
270            if (first > last) {
271                throw new IllegalArgumentException("First cannot be more than last");
272            }
273            if (isEmpty()) {
274                // We are missing all the messages.
275                rc.add(new Sequence(first, last));
276                return rc;
277            }
278    
279            Sequence sequence = getHead();
280            while (sequence != null && first <= last) {
281                if (sequence.contains(first)) {
282                    first = sequence.last + 1;
283                } else {
284                    if (first < sequence.first) {
285                        if (last < sequence.first) {
286                            rc.add(new Sequence(first, last));
287                            return rc;
288                        } else {
289                            rc.add(new Sequence(first, sequence.first - 1));
290                            first = sequence.last + 1;
291                        }
292                    }
293                }
294                sequence = sequence.getNext();
295            }
296    
297            if (first <= last) {
298                rc.add(new Sequence(first, last));
299            }
300            return rc;
301        }
302    
303        /**
304         * @return all the Sequence that are in this list
305         */
306        public List<Sequence> getReceived() {
307            ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
308            Sequence sequence = getHead();
309            while (sequence != null) {
310                rc.add(new Sequence(sequence.first, sequence.last));
311                sequence = sequence.getNext();
312            }
313            return rc;
314        }
315    
316        /**
317         * Returns true if the value given is contained within one of the
318         * sequences held in this set.
319         *
320         * @param value
321         *      The value to search for in the set.
322         *
323         * @return true if the value is contained in the set.
324         */
325        public boolean contains(long value) {
326            if (isEmpty()) {
327                return false;
328            }
329    
330            Sequence sequence = getHead();
331            while (sequence != null) {
332                if (sequence.contains(value)) {
333                    return true;
334                }
335                sequence = sequence.getNext();
336            }
337    
338            return false;
339        }
340    
341        public boolean contains(int first, int last) {
342            if (isEmpty()) {
343                return false;
344            }
345            Sequence sequence = getHead();
346            while (sequence != null) {
347                if (sequence.first <= first && first <= sequence.last ) {
348                    return last <= sequence.last;
349                }
350                sequence = sequence.getNext();
351            }
352            return false;
353        }
354    
355        /**
356         * Computes the size of this Sequence by summing the values of all
357         * the contained sequences.
358         *
359         * @return the total number of values contained in this set if it
360         *         were to be iterated over like an array.
361         */
362        public long rangeSize() {
363            long result = 0;
364            Sequence sequence = getHead();
365            while (sequence != null) {
366                result += sequence.range();
367                sequence = sequence.getNext();
368            }
369            return result;
370        }
371    
372        public Iterator<Long> iterator() {
373            return new SequenceIterator();
374        }
375    
376        private class SequenceIterator implements Iterator<Long> {
377    
378            private Sequence currentEntry;
379            private long lastReturned = -1;
380    
381            public SequenceIterator() {
382                currentEntry = getHead();
383                if (currentEntry != null) {
384                    lastReturned = currentEntry.first - 1;
385                }
386            }
387    
388            public boolean hasNext() {
389                return currentEntry != null;
390            }
391    
392            public Long next() {
393                if (currentEntry == null) {
394                    throw new NoSuchElementException();
395                }
396    
397                if(lastReturned < currentEntry.first) {
398                    lastReturned = currentEntry.first;
399                    if (currentEntry.range() == 1) {
400                        currentEntry = currentEntry.getNext();
401                    }
402                } else {
403                    lastReturned++;
404                    if (lastReturned == currentEntry.last) {
405                        currentEntry = currentEntry.getNext();
406                    }
407                }
408    
409                return lastReturned;
410            }
411    
412            public void remove() {
413                throw new UnsupportedOperationException();
414            }
415    
416        }
417    
418    }