001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import org.apache.activemq.broker.Broker;
022    import org.apache.activemq.broker.ConnectionContext;
023    import org.apache.activemq.broker.region.MessageReference;
024    import org.apache.activemq.broker.region.SubscriptionRecovery;
025    import org.apache.activemq.broker.region.Topic;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.filter.DestinationFilter;
029    
030    /**
031     * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
032     * count of last messages.
033     * 
034     * @org.apache.xbean.XBean
035     * 
036     */
037    public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
038        private volatile MessageReference messages[];
039        private int maximumSize = 100;
040        private int tail;
041    
042        public SubscriptionRecoveryPolicy copy() {
043            FixedCountSubscriptionRecoveryPolicy rc = new FixedCountSubscriptionRecoveryPolicy();
044            rc.setMaximumSize(maximumSize);
045            return rc;
046        }
047    
048        public synchronized boolean add(ConnectionContext context, MessageReference node) throws Exception {
049            messages[tail++] = node;
050            if (tail >= messages.length) {
051                tail = 0;
052            }
053            return true;
054        }
055    
056        public synchronized void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
057            // Re-dispatch the last message seen.
058            int t = tail;
059            // The buffer may not have rolled over yet..., start from the front
060            if (messages[t] == null) {
061                t = 0;
062            }
063            // Well the buffer is really empty then.
064            if (messages[t] == null) {
065                return;
066            }
067            // Keep dispatching until t hit's tail again.
068            do {
069                MessageReference node = messages[t];
070                sub.addRecoveredMessage(context, node);
071                t++;
072                if (t >= messages.length) {
073                    t = 0;
074                }
075            } while (t != tail);
076        }
077    
078        public void start() throws Exception {
079            messages = new MessageReference[maximumSize];
080        }
081    
082        public void stop() throws Exception {
083            messages = null;
084        }
085    
086        public int getMaximumSize() {
087            return maximumSize;
088        }
089    
090        /**
091         * Sets the maximum number of messages that this destination will hold
092         * around in RAM
093         */
094        public void setMaximumSize(int maximumSize) {
095            this.maximumSize = maximumSize;
096        }
097    
098        public synchronized Message[] browse(ActiveMQDestination destination) throws Exception {
099            List<Message> result = new ArrayList<Message>();
100            DestinationFilter filter = DestinationFilter.parseFilter(destination);
101            int t = tail;
102            if (messages[t] == null) {
103                t = 0;
104            }
105            if (messages[t] != null) {
106                do {
107                    MessageReference ref = messages[t];
108                    Message message = ref.getMessage();
109                    if (filter.matches(message.getDestination())) {
110                        result.add(message);
111                    }
112                    t++;
113                    if (t >= messages.length) {
114                        t = 0;
115                    }
116                } while (t != tail);
117            }
118            return result.toArray(new Message[result.size()]);
119        }
120    
121        public void setBroker(Broker broker) {        
122        }
123    
124    }