001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.Serializable;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.MessageId;
026import org.apache.activemq.command.ProducerId;
027import org.apache.activemq.util.BitArrayBin;
028import org.apache.activemq.util.IdGenerator;
029import org.apache.activemq.util.LRUCache;
030
031/**
032 * Provides basic audit functions for Messages without sync
033 */
034public class ActiveMQMessageAuditNoSync implements Serializable {
035
036    private static final long serialVersionUID = 1L;
037
038    public static final int DEFAULT_WINDOW_SIZE = 2048;
039    public static final int MAXIMUM_PRODUCER_COUNT = 64;
040    private int auditDepth;
041    private int maximumNumberOfProducersToTrack;
042    private final LRUCache<String, BitArrayBin> map;
043    private transient boolean modified = true;
044
045    /**
046     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64
047     */
048    public ActiveMQMessageAuditNoSync() {
049        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
050    }
051
052    /**
053     * Construct a MessageAudit
054     *
055     * @param auditDepth range of ids to track
056     * @param maximumNumberOfProducersToTrack number of producers expected in the system
057     */
058    public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
059        this.auditDepth = auditDepth;
060        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
061        this.map = new LRUCache<String, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
062    }
063
064    /**
065     * @return the auditDepth
066     */
067    public int getAuditDepth() {
068        return auditDepth;
069    }
070
071    /**
072     * @param auditDepth the auditDepth to set
073     */
074    public void setAuditDepth(int auditDepth) {
075        this.auditDepth = auditDepth;
076        this.modified = true;
077    }
078
079    /**
080     * @return the maximumNumberOfProducersToTrack
081     */
082    public int getMaximumNumberOfProducersToTrack() {
083        return maximumNumberOfProducersToTrack;
084    }
085
086    /**
087     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
088     */
089    public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) {
090
091        if (maximumNumberOfProducersToTrack < this.maximumNumberOfProducersToTrack){
092            LRUCache<String, BitArrayBin> newMap = new LRUCache<String, BitArrayBin>(0,maximumNumberOfProducersToTrack,0.75f,true);
093            /**
094             * As putAll will access the entries in the right order,
095             * this shouldn't result in wrong cache entries being removed
096             */
097            newMap.putAll(this.map);
098            this.map.clear();
099            this.map.putAll(newMap);
100        }
101        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
102        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
103        this.modified = true;
104    }
105
106    /**
107     * Checks if this message has been seen before
108     *
109     * @param message
110     * @return true if the message is a duplicate
111     * @throws JMSException
112     */
113    public boolean isDuplicate(Message message) throws JMSException {
114        return isDuplicate(message.getJMSMessageID());
115    }
116
117    /**
118     * checks whether this messageId has been seen before and adds this
119     * messageId to the list
120     *
121     * @param id
122     * @return true if the message is a duplicate
123     */
124    public boolean isDuplicate(String id) {
125        boolean answer = false;
126        String seed = IdGenerator.getSeedFromId(id);
127        if (seed != null) {
128            BitArrayBin bab = map.get(seed);
129            if (bab == null) {
130                bab = new BitArrayBin(auditDepth);
131                map.put(seed, bab);
132                modified = true;
133            }
134            long index = IdGenerator.getSequenceFromId(id);
135            if (index >= 0) {
136                answer = bab.setBit(index, true);
137                modified = true;
138            }
139        }
140        return answer;
141    }
142
143    /**
144     * Checks if this message has been seen before
145     *
146     * @param message
147     * @return true if the message is a duplicate
148     */
149    public boolean isDuplicate(final MessageReference message) {
150        MessageId id = message.getMessageId();
151        return isDuplicate(id);
152    }
153
154    /**
155     * Checks if this messageId has been seen before
156     *
157     * @param id
158     * @return true if the message is a duplicate
159     */
160    public boolean isDuplicate(final MessageId id) {
161        boolean answer = false;
162
163        if (id != null) {
164            ProducerId pid = id.getProducerId();
165            if (pid != null) {
166                BitArrayBin bab = map.get(pid.toString());
167                if (bab == null) {
168                    bab = new BitArrayBin(auditDepth);
169                    map.put(pid.toString(), bab);
170                    modified = true;
171                }
172                answer = bab.setBit(id.getProducerSequenceId(), true);
173            }
174        }
175        return answer;
176    }
177
178    /**
179     * mark this message as being received
180     *
181     * @param message
182     */
183    public void rollback(final MessageReference message) {
184        MessageId id = message.getMessageId();
185        rollback(id);
186    }
187
188    /**
189     * mark this message as being received
190     *
191     * @param id
192     */
193    public void rollback(final  MessageId id) {
194        if (id != null) {
195            ProducerId pid = id.getProducerId();
196            if (pid != null) {
197                BitArrayBin bab = map.get(pid.toString());
198                if (bab != null) {
199                    bab.setBit(id.getProducerSequenceId(), false);
200                    modified = true;
201                }
202            }
203        }
204    }
205
206    public void rollback(final String id) {
207        String seed = IdGenerator.getSeedFromId(id);
208        if (seed != null) {
209            BitArrayBin bab = map.get(seed);
210            if (bab != null) {
211                long index = IdGenerator.getSequenceFromId(id);
212                bab.setBit(index, false);
213                modified = true;
214            }
215        }
216    }
217
218    /**
219     * Check the message is in order
220     *
221     * @param msg
222     *
223     * @return true if the id is in order
224     *
225     * @throws JMSException
226     */
227    public boolean isInOrder(Message msg) throws JMSException {
228        return isInOrder(msg.getJMSMessageID());
229    }
230
231    /**
232     * Check the message id is in order
233     *
234     * @param id
235     *
236     * @return true if the id is in order
237     */
238    public boolean isInOrder(final String id) {
239        boolean answer = true;
240
241        if (id != null) {
242            String seed = IdGenerator.getSeedFromId(id);
243            if (seed != null) {
244                BitArrayBin bab = map.get(seed);
245                if (bab != null) {
246                    long index = IdGenerator.getSequenceFromId(id);
247                    answer = bab.isInOrder(index);
248                    modified = true;
249                }
250            }
251        }
252        return answer;
253    }
254
255    /**
256     * Check the MessageId is in order
257     *
258     * @param message
259     *
260     * @return true if the id is in order
261     */
262    public boolean isInOrder(final MessageReference message) {
263        return isInOrder(message.getMessageId());
264    }
265
266    /**
267     * Check the MessageId is in order
268     *
269     * @param id
270     *
271     * @return true if the id is in order
272     */
273    public boolean isInOrder(final MessageId id) {
274        boolean answer = false;
275
276        if (id != null) {
277            ProducerId pid = id.getProducerId();
278            if (pid != null) {
279                BitArrayBin bab = map.get(pid.toString());
280                if (bab == null) {
281                    bab = new BitArrayBin(auditDepth);
282                    map.put(pid.toString(), bab);
283                    modified = true;
284                }
285                answer = bab.isInOrder(id.getProducerSequenceId());
286
287            }
288        }
289        return answer;
290    }
291
292    public long getLastSeqId(ProducerId id) {
293        long result = -1;
294        BitArrayBin bab = map.get(id.toString());
295        if (bab != null) {
296            result = bab.getLastSetIndex();
297        }
298        return result;
299    }
300
301    public void clear() {
302        map.clear();
303    }
304
305    /**
306     * Returns if the Audit has been modified since last check, this method does not
307     * reset the modified flag.  If the caller needs to reset the flag in order to avoid
308     * serializing an unchanged Audit then its up the them to reset it themselves.
309     *
310     * @return true if the Audit has been modified.
311     */
312    public boolean isModified() {
313        return this.modified;
314    }
315
316    public void setModified(boolean modified) {
317        this.modified = modified;
318    }
319
320    /**
321     * Reads and returns the current modified state of the Audit, once called the state is
322     * reset to false.  This method is useful for code the needs to know if it should write
323     * out the Audit or otherwise execute some logic based on the Audit having changed since
324     * last check.
325     *
326     * @return true if the Audit has been modified since last check.
327     */
328    public boolean modified() {
329        if (this.modified) {
330            this.modified = false;
331            return true;
332        }
333
334        return false;
335    }
336}