001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.Serializable;
020    
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.command.MessageId;
026    import org.apache.activemq.command.ProducerId;
027    import org.apache.activemq.util.BitArrayBin;
028    import org.apache.activemq.util.IdGenerator;
029    import org.apache.activemq.util.LRUCache;
030    
031    /**
032     * Provides basic audit functions for Messages without sync
033     * 
034     * 
035     */
036    public class ActiveMQMessageAuditNoSync implements Serializable {
037    
038        private static final long serialVersionUID = 1L;
039    
040        public static final int DEFAULT_WINDOW_SIZE = 2048;
041        public static final int MAXIMUM_PRODUCER_COUNT = 64;
042        private int auditDepth;
043        private int maximumNumberOfProducersToTrack;
044        private LRUCache<Object, BitArrayBin> map;
045    
046        /**
047         * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
048         * 64
049         */
050        public ActiveMQMessageAuditNoSync() {
051            this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052        }
053    
054        /**
055         * Construct a MessageAudit
056         * 
057         * @param auditDepth range of ids to track
058         * @param maximumNumberOfProducersToTrack number of producers expected in
059         *                the system
060         */
061        public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
062            this.auditDepth = auditDepth;
063            this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
064            this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
065        }
066        
067        /**
068         * @return the auditDepth
069         */
070        public int getAuditDepth() {
071            return auditDepth;
072        }
073    
074        /**
075         * @param auditDepth the auditDepth to set
076         */
077        public void setAuditDepth(int auditDepth) {
078            this.auditDepth = auditDepth;
079        }
080    
081        /**
082         * @return the maximumNumberOfProducersToTrack
083         */
084        public int getMaximumNumberOfProducersToTrack() {
085            return maximumNumberOfProducersToTrack;
086        }
087    
088        /**
089         * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090         */
091        public void setMaximumNumberOfProducersToTrack(
092                int maximumNumberOfProducersToTrack) {
093            this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
094            this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
095        }
096    
097        /**
098         * Checks if this message has been seen before
099         * 
100         * @param message
101         * @return true if the message is a duplicate
102         * @throws JMSException
103         */
104        public boolean isDuplicate(Message message) throws JMSException {
105            return isDuplicate(message.getJMSMessageID());
106        }
107    
108        /**
109         * checks whether this messageId has been seen before and adds this
110         * messageId to the list
111         * 
112         * @param id
113         * @return true if the message is a duplicate
114         */
115        public boolean isDuplicate(String id) {
116            boolean answer = false;
117            String seed = IdGenerator.getSeedFromId(id);
118            if (seed != null) {
119                BitArrayBin bab = map.get(seed);
120                if (bab == null) {
121                    bab = new BitArrayBin(auditDepth);
122                    map.put(seed, bab);
123                }
124                long index = IdGenerator.getSequenceFromId(id);
125                if (index >= 0) {
126                    answer = bab.setBit(index, true);
127                }
128            }
129            return answer;
130        }
131    
132        /**
133         * Checks if this message has been seen before
134         * 
135         * @param message
136         * @return true if the message is a duplicate
137         */
138        public boolean isDuplicate(final MessageReference message) {
139            MessageId id = message.getMessageId();
140            return isDuplicate(id);
141        }
142        
143        /**
144         * Checks if this messageId has been seen before
145         * 
146         * @param id
147         * @return true if the message is a duplicate
148         */
149        public boolean isDuplicate(final MessageId id) {
150            boolean answer = false;
151            
152            if (id != null) {
153                ProducerId pid = id.getProducerId();
154                if (pid != null) {
155                    BitArrayBin bab = map.get(pid);
156                    if (bab == null) {
157                        bab = new BitArrayBin(auditDepth);
158                        map.put(pid, bab);
159                    }
160                    answer = bab.setBit(id.getProducerSequenceId(), true);
161                }
162            }
163            return answer;
164        }
165    
166        /**
167         * mark this message as being received
168         * 
169         * @param message
170         */
171        public void rollback(final MessageReference message) {
172            MessageId id = message.getMessageId();
173            rollback(id);
174        }
175        
176        /**
177         * mark this message as being received
178         * 
179         * @param id
180         */
181        public void rollback(final  MessageId id) {
182            if (id != null) {
183                ProducerId pid = id.getProducerId();
184                if (pid != null) {
185                    BitArrayBin bab = map.get(pid);
186                    if (bab != null) {
187                        bab.setBit(id.getProducerSequenceId(), false);
188                    }
189                }
190            }
191        }
192    
193        public void rollback(final String id) {
194            String seed = IdGenerator.getSeedFromId(id);
195            if (seed != null) {
196                BitArrayBin bab = map.get(seed);
197                if (bab != null) {
198                    long index = IdGenerator.getSequenceFromId(id);
199                    bab.setBit(index, false);
200                }
201            }
202        }
203        
204        /**
205         * Check the message is in order
206         * @param msg
207         * @return
208         * @throws JMSException
209         */
210        public boolean isInOrder(Message msg) throws JMSException {
211            return isInOrder(msg.getJMSMessageID());
212        }
213        
214        /**
215         * Check the message id is in order
216         * @param id
217         * @return
218         */
219        public boolean isInOrder(final String id) {
220            boolean answer = true;
221            
222            if (id != null) {
223                String seed = IdGenerator.getSeedFromId(id);
224                if (seed != null) {
225                    BitArrayBin bab = map.get(seed);
226                    if (bab != null) {
227                        long index = IdGenerator.getSequenceFromId(id);
228                        answer = bab.isInOrder(index);
229                    }
230                   
231                }
232            }
233            return answer;
234        }
235        
236        /**
237         * Check the MessageId is in order
238         * @param message 
239         * @return
240         */
241        public boolean isInOrder(final MessageReference message) {
242            return isInOrder(message.getMessageId());
243        }
244        
245        /**
246         * Check the MessageId is in order
247         * @param id
248         * @return
249         */
250        public boolean isInOrder(final MessageId id) {
251            boolean answer = false;
252    
253            if (id != null) {
254                ProducerId pid = id.getProducerId();
255                if (pid != null) {
256                    BitArrayBin bab = map.get(pid);
257                    if (bab == null) {
258                        bab = new BitArrayBin(auditDepth);
259                        map.put(pid, bab);
260                    }
261                    answer = bab.isInOrder(id.getProducerSequenceId());
262    
263                }
264            }
265            return answer;
266        }
267    
268        public long getLastSeqId(ProducerId id) {
269            long result = -1;
270            BitArrayBin bab = map.get(id.toString());
271            if (bab != null) {
272                result = bab.getLastSetIndex();
273            }
274            return result;
275        }
276    
277        public void clear() {
278            map.clear();
279        }
280    }