001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.memory;
018    
019    import java.util.Iterator;
020    import java.util.LinkedList;
021    import java.util.List;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    
024    import org.apache.activemq.thread.Task;
025    import org.apache.activemq.thread.TaskRunner;
026    import org.apache.activemq.thread.TaskRunnerFactory;
027    import org.apache.activemq.usage.Usage;
028    import org.apache.activemq.usage.UsageListener;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    public class CacheEvictionUsageListener implements UsageListener {
033    
034        private static final Logger LOG = LoggerFactory.getLogger(CacheEvictionUsageListener.class);
035    
036        private final List<CacheEvictor> evictors = new CopyOnWriteArrayList<CacheEvictor>();
037        private final int usageHighMark;
038        private final int usageLowMark;
039    
040        private final TaskRunner evictionTask;
041        private final Usage usage;
042    
043        public CacheEvictionUsageListener(Usage usage, int usageHighMark, int usageLowMark, TaskRunnerFactory taskRunnerFactory) {
044            this.usage = usage;
045            this.usageHighMark = usageHighMark;
046            this.usageLowMark = usageLowMark;
047            evictionTask = taskRunnerFactory.createTaskRunner(new Task() {
048                public boolean iterate() {
049                    return evictMessages();
050                }
051            }, "Cache Evictor: " + System.identityHashCode(this));
052        }
053    
054        boolean evictMessages() {
055            // Try to take the memory usage down below the low mark.
056            LOG.debug("Evicting cache memory usage: " + usage.getPercentUsage());
057    
058            List<CacheEvictor> list = new LinkedList<CacheEvictor>(evictors);
059            while (list.size() > 0 && usage.getPercentUsage() > usageLowMark) {
060    
061                // Evenly evict messages from all evictors
062                for (Iterator<CacheEvictor> iter = list.iterator(); iter.hasNext();) {
063                    CacheEvictor evictor = iter.next();
064                    if (evictor.evictCacheEntry() == null) {
065                        iter.remove();
066                    }
067                }
068            }
069            return false;
070        }
071    
072        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
073            // Do we need to start evicting cache entries? Usage > than the
074            // high mark
075            if (oldPercentUsage < newPercentUsage && usage.getPercentUsage() >= usageHighMark) {
076                try {
077                    evictionTask.wakeup();
078                } catch (InterruptedException e) {
079                    Thread.currentThread().interrupt();
080                }
081            }
082        }
083    
084        public void add(CacheEvictor evictor) {
085            evictors.add(evictor);
086        }
087    
088        public void remove(CacheEvictor evictor) {
089            evictors.remove(evictor);
090        }
091    }