001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.broker.region.SubscriptionRecovery;
028import org.apache.activemq.broker.region.Topic;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.filter.DestinationFilter;
032import org.apache.activemq.thread.Scheduler;
033
034/**
035 * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed
036 * buffer of messages around in memory and use that to recover new
037 * subscriptions.
038 * 
039 * @org.apache.xbean.XBean
040 * 
041 */
042public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
043
044    private static final int GC_INTERVAL = 1000;
045    private Scheduler scheduler;
046    
047    // TODO: need to get a better synchronized linked list that has little
048    // contention between enqueuing and dequeuing
049    private final List<TimestampWrapper> buffer = Collections.synchronizedList(new LinkedList<TimestampWrapper>());
050    private volatile long lastGCRun = System.currentTimeMillis();
051
052    private long recoverDuration = 60 * 1000; // Buffer for 1 min.
053
054    static class TimestampWrapper {
055        public MessageReference message;
056        public long timestamp;
057
058        public TimestampWrapper(MessageReference message, long timestamp) {
059            this.message = message;
060            this.timestamp = timestamp;
061        }
062    }
063
064    private final Runnable gcTask = new Runnable() {
065        public void run() {
066            gc();
067        }
068    };
069
070    public SubscriptionRecoveryPolicy copy() {
071        TimedSubscriptionRecoveryPolicy rc = new TimedSubscriptionRecoveryPolicy();
072        rc.setRecoverDuration(recoverDuration);
073        return rc;
074    }
075
076    public boolean add(ConnectionContext context, MessageReference message) throws Exception {
077        buffer.add(new TimestampWrapper(message, lastGCRun));
078        return true;
079    }
080
081    public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
082        // Re-dispatch the messages from the buffer.
083        ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer);
084        if (!copy.isEmpty()) {
085            for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) {
086                TimestampWrapper timestampWrapper = iter.next();
087                MessageReference message = timestampWrapper.message;
088                sub.addRecoveredMessage(context, message);
089            }
090        }
091    }
092    
093    public void setBroker(Broker broker) {  
094        this.scheduler = broker.getScheduler();
095    }
096
097    public void start() throws Exception {
098        scheduler.executePeriodically(gcTask, GC_INTERVAL);
099    }
100
101    public void stop() throws Exception {
102        scheduler.cancel(gcTask);
103    }
104    
105
106    public void gc() {
107        lastGCRun = System.currentTimeMillis();
108        while (buffer.size() > 0) {
109            TimestampWrapper timestampWrapper = buffer.get(0);
110            if (lastGCRun > timestampWrapper.timestamp + recoverDuration) {
111                // GC it.
112                buffer.remove(0);
113            } else {
114                break;
115            }
116        }
117    }
118
119    public long getRecoverDuration() {
120        return recoverDuration;
121    }
122
123    public void setRecoverDuration(long recoverDuration) {
124        this.recoverDuration = recoverDuration;
125    }
126
127    public Message[] browse(ActiveMQDestination destination) throws Exception {
128        List<Message> result = new ArrayList<Message>();
129        ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer);
130        DestinationFilter filter = DestinationFilter.parseFilter(destination);
131        for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) {
132            TimestampWrapper timestampWrapper = iter.next();
133            MessageReference ref = timestampWrapper.message;
134            Message message = ref.getMessage();
135            if (filter.matches(message.getDestination())) {
136                result.add(message);
137            }
138        }
139        return result.toArray(new Message[result.size()]);
140    }
141
142}