001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.Serializable;
020    
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.command.MessageId;
026    import org.apache.activemq.command.ProducerId;
027    import org.apache.activemq.util.BitArrayBin;
028    import org.apache.activemq.util.IdGenerator;
029    import org.apache.activemq.util.LRUCache;
030    
031    /**
032     * Provides basic audit functions for Messages without sync
033     *
034     *
035     */
036    public class ActiveMQMessageAuditNoSync implements Serializable {
037    
038        private static final long serialVersionUID = 1L;
039    
040        public static final int DEFAULT_WINDOW_SIZE = 2048;
041        public static final int MAXIMUM_PRODUCER_COUNT = 64;
042        private int auditDepth;
043        private int maximumNumberOfProducersToTrack;
044        private final LRUCache<Object, BitArrayBin> map;
045        private transient boolean modified = true;
046    
047        /**
048         * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64
049         */
050        public ActiveMQMessageAuditNoSync() {
051            this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052        }
053    
054        /**
055         * Construct a MessageAudit
056         *
057         * @param auditDepth range of ids to track
058         * @param maximumNumberOfProducersToTrack number of producers expected in the system
059         */
060        public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
061            this.auditDepth = auditDepth;
062            this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
063            this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
064        }
065    
066        /**
067         * @return the auditDepth
068         */
069        public int getAuditDepth() {
070            return auditDepth;
071        }
072    
073        /**
074         * @param auditDepth the auditDepth to set
075         */
076        public void setAuditDepth(int auditDepth) {
077            this.auditDepth = auditDepth;
078            this.modified = true;
079        }
080    
081        /**
082         * @return the maximumNumberOfProducersToTrack
083         */
084        public int getMaximumNumberOfProducersToTrack() {
085            return maximumNumberOfProducersToTrack;
086        }
087    
088        /**
089         * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090         */
091        public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) {
092    
093            if (maximumNumberOfProducersToTrack < this.maximumNumberOfProducersToTrack){
094                LRUCache<Object, BitArrayBin> newMap = new LRUCache<Object, BitArrayBin>(0,maximumNumberOfProducersToTrack,0.75f,true);
095                /**
096                 * As putAll will access the entries in the right order,
097                 * this shouldn't result in wrong cache entries being removed
098                 */
099                newMap.putAll(this.map);
100                this.map.clear();
101                this.map.putAll(newMap);
102            }
103            this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
104            this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
105            this.modified = true;
106        }
107    
108        /**
109         * Checks if this message has been seen before
110         *
111         * @param message
112         * @return true if the message is a duplicate
113         * @throws JMSException
114         */
115        public boolean isDuplicate(Message message) throws JMSException {
116            return isDuplicate(message.getJMSMessageID());
117        }
118    
119        /**
120         * checks whether this messageId has been seen before and adds this
121         * messageId to the list
122         *
123         * @param id
124         * @return true if the message is a duplicate
125         */
126        public boolean isDuplicate(String id) {
127            boolean answer = false;
128            String seed = IdGenerator.getSeedFromId(id);
129            if (seed != null) {
130                BitArrayBin bab = map.get(seed);
131                if (bab == null) {
132                    bab = new BitArrayBin(auditDepth);
133                    map.put(seed, bab);
134                    modified = true;
135                }
136                long index = IdGenerator.getSequenceFromId(id);
137                if (index >= 0) {
138                    answer = bab.setBit(index, true);
139                    modified = true;
140                }
141            }
142            return answer;
143        }
144    
145        /**
146         * Checks if this message has been seen before
147         *
148         * @param message
149         * @return true if the message is a duplicate
150         */
151        public boolean isDuplicate(final MessageReference message) {
152            MessageId id = message.getMessageId();
153            return isDuplicate(id);
154        }
155    
156        /**
157         * Checks if this messageId has been seen before
158         *
159         * @param id
160         * @return true if the message is a duplicate
161         */
162        public boolean isDuplicate(final MessageId id) {
163            boolean answer = false;
164    
165            if (id != null) {
166                ProducerId pid = id.getProducerId();
167                if (pid != null) {
168                    BitArrayBin bab = map.get(pid);
169                    if (bab == null) {
170                        bab = new BitArrayBin(auditDepth);
171                        map.put(pid, bab);
172                        modified = true;
173                    }
174                    answer = bab.setBit(id.getProducerSequenceId(), true);
175                }
176            }
177            return answer;
178        }
179    
180        /**
181         * mark this message as being received
182         *
183         * @param message
184         */
185        public void rollback(final MessageReference message) {
186            MessageId id = message.getMessageId();
187            rollback(id);
188        }
189    
190        /**
191         * mark this message as being received
192         *
193         * @param id
194         */
195        public void rollback(final  MessageId id) {
196            if (id != null) {
197                ProducerId pid = id.getProducerId();
198                if (pid != null) {
199                    BitArrayBin bab = map.get(pid);
200                    if (bab != null) {
201                        bab.setBit(id.getProducerSequenceId(), false);
202                        modified = true;
203                    }
204                }
205            }
206        }
207    
208        public void rollback(final String id) {
209            String seed = IdGenerator.getSeedFromId(id);
210            if (seed != null) {
211                BitArrayBin bab = map.get(seed);
212                if (bab != null) {
213                    long index = IdGenerator.getSequenceFromId(id);
214                    bab.setBit(index, false);
215                    modified = true;
216                }
217            }
218        }
219    
220        /**
221         * Check the message is in order
222         * @param msg
223         * @return
224         * @throws JMSException
225         */
226        public boolean isInOrder(Message msg) throws JMSException {
227            return isInOrder(msg.getJMSMessageID());
228        }
229    
230        /**
231         * Check the message id is in order
232         * @param id
233         * @return
234         */
235        public boolean isInOrder(final String id) {
236            boolean answer = true;
237    
238            if (id != null) {
239                String seed = IdGenerator.getSeedFromId(id);
240                if (seed != null) {
241                    BitArrayBin bab = map.get(seed);
242                    if (bab != null) {
243                        long index = IdGenerator.getSequenceFromId(id);
244                        answer = bab.isInOrder(index);
245                        modified = true;
246                    }
247                }
248            }
249            return answer;
250        }
251    
252        /**
253         * Check the MessageId is in order
254         * @param message
255         * @return
256         */
257        public boolean isInOrder(final MessageReference message) {
258            return isInOrder(message.getMessageId());
259        }
260    
261        /**
262         * Check the MessageId is in order
263         * @param id
264         * @return
265         */
266        public boolean isInOrder(final MessageId id) {
267            boolean answer = false;
268    
269            if (id != null) {
270                ProducerId pid = id.getProducerId();
271                if (pid != null) {
272                    BitArrayBin bab = map.get(pid);
273                    if (bab == null) {
274                        bab = new BitArrayBin(auditDepth);
275                        map.put(pid, bab);
276                        modified = true;
277                    }
278                    answer = bab.isInOrder(id.getProducerSequenceId());
279    
280                }
281            }
282            return answer;
283        }
284    
285        public long getLastSeqId(ProducerId id) {
286            long result = -1;
287            BitArrayBin bab = map.get(id.toString());
288            if (bab != null) {
289                result = bab.getLastSetIndex();
290            }
291            return result;
292        }
293    
294        public void clear() {
295            map.clear();
296        }
297    
298        /**
299         * Returns if the Audit has been modified since last check, this method does not
300         * reset the modified flag.  If the caller needs to reset the flag in order to avoid
301         * serializing an unchanged Audit then its up the them to reset it themselves.
302         *
303         * @return true if the Audit has been modified.
304         */
305        public boolean isModified() {
306            return this.modified;
307        }
308    
309        public void setModified(boolean modified) {
310            this.modified = modified;
311        }
312    
313        /**
314         * Reads and returns the current modified state of the Audit, once called the state is
315         * reset to false.  This method is useful for code the needs to know if it should write
316         * out the Audit or otherwise execute some logic based on the Audit having changed since
317         * last check.
318         *
319         * @return true if the Audit has been modified since last check.
320         */
321        public boolean modified() {
322            if (this.modified) {
323                this.modified = false;
324                return true;
325            }
326    
327            return false;
328        }
329    }