001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.usage;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import org.apache.activemq.Service;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * Used to keep track of how much of something is being used so that a
032     * productive working set usage can be controlled. Main use case is manage
033     * memory usage.
034     * 
035     * @org.apache.xbean.XBean
036     * 
037     */
038    public abstract class Usage<T extends Usage> implements Service {
039    
040        private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
041        protected final Object usageMutex = new Object();
042        protected int percentUsage;
043        protected T parent;
044        private UsageCapacity limiter = new DefaultUsageCapacity();
045        private int percentUsageMinDelta = 1;
046        private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
047        private final boolean debug = LOG.isDebugEnabled();
048        protected String name;
049        private float usagePortion = 1.0f;
050        private final List<T> children = new CopyOnWriteArrayList<T>();
051        private final List<Runnable> callbacks = new LinkedList<Runnable>();
052        private int pollingTime = 100;
053        private final AtomicBoolean started=new AtomicBoolean();
054        private ThreadPoolExecutor executor;
055        public Usage(T parent, String name, float portion) {
056            this.parent = parent;
057            this.usagePortion = portion;
058            if (parent != null) {
059                this.limiter.setLimit((long)(parent.getLimit() * portion));
060                name = parent.name + ":" + name;
061            }
062            this.name = name;
063        }
064    
065        protected abstract long retrieveUsage();
066    
067        /**
068         * @throws InterruptedException
069         */
070        public void waitForSpace() throws InterruptedException {
071            waitForSpace(0);
072        }
073    
074        public boolean waitForSpace(long timeout) throws InterruptedException {
075            return waitForSpace(timeout, 100);
076        }
077        
078        /**
079         * @param timeout
080         * @throws InterruptedException
081         * @return true if space
082         */
083        public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
084            if (parent != null) {
085                if (!parent.waitForSpace(timeout, highWaterMark)) {
086                    return false;
087                }
088            }
089            synchronized (usageMutex) {
090                percentUsage=caclPercentUsage();
091                if (percentUsage >= highWaterMark) {
092                    long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
093                    long timeleft = deadline;
094                    while (timeleft > 0) {
095                        percentUsage=caclPercentUsage();
096                        if (percentUsage >= highWaterMark) {
097                            usageMutex.wait(pollingTime);
098                            timeleft = deadline - System.currentTimeMillis();
099                        } else {
100                            break;
101                        }
102                    }
103                }
104                return percentUsage < highWaterMark;
105            }
106        }
107    
108        public boolean isFull() {
109            return isFull(100);
110        }
111        
112        public boolean isFull(int highWaterMark) {
113            if (parent != null && parent.isFull(highWaterMark)) {
114                return true;
115            }
116            synchronized (usageMutex) {
117                percentUsage=caclPercentUsage();
118                return percentUsage >= highWaterMark;
119            }
120        }
121    
122        public void addUsageListener(UsageListener listener) {
123            listeners.add(listener);
124        }
125    
126        public void removeUsageListener(UsageListener listener) {
127            listeners.remove(listener);
128        }
129    
130        public long getLimit() {
131            synchronized (usageMutex) {
132                return limiter.getLimit();
133            }
134        }
135    
136        /**
137         * Sets the memory limit in bytes. Setting the limit in bytes will set the
138         * usagePortion to 0 since the UsageManager is not going to be portion based
139         * off the parent.
140         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
141         * 
142         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
143         */
144        public void setLimit(long limit) {
145            if (percentUsageMinDelta < 0) {
146                throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
147            }
148            synchronized (usageMutex) {
149                this.limiter.setLimit(limit);
150                this.usagePortion = 0;
151            }
152            onLimitChange();
153        }
154    
155        protected void onLimitChange() {
156            // We may need to calculate the limit
157            if (usagePortion > 0 && parent != null) {
158                synchronized (usageMutex) {
159                    this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
160                }
161            }
162            // Reset the percent currently being used.
163            int percentUsage;
164            synchronized (usageMutex) {
165                percentUsage = caclPercentUsage();
166            }
167            setPercentUsage(percentUsage);
168            // Let the children know that the limit has changed. They may need to
169            // set
170            // their limits based on ours.
171            for (T child : children) {
172                child.onLimitChange();
173            }
174        }
175    
176        public float getUsagePortion() {
177            synchronized (usageMutex) {
178                return usagePortion;
179            }
180        }
181    
182        public void setUsagePortion(float usagePortion) {
183            synchronized (usageMutex) {
184                this.usagePortion = usagePortion;
185            }
186            onLimitChange();
187        }
188    
189        public int getPercentUsage() {
190            synchronized (usageMutex) {
191                return percentUsage;
192            }
193        }
194    
195        public int getPercentUsageMinDelta() {
196            synchronized (usageMutex) {
197                return percentUsageMinDelta;
198            }
199        }
200    
201        /**
202         * Sets the minimum number of percentage points the usage has to change
203         * before a UsageListener event is fired by the manager.
204         * 
205         * @param percentUsageMinDelta
206         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
207         */
208        public void setPercentUsageMinDelta(int percentUsageMinDelta) {
209            if (percentUsageMinDelta < 1) {
210                throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
211            }
212            int percentUsage;
213            synchronized (usageMutex) {
214                this.percentUsageMinDelta = percentUsageMinDelta;
215                percentUsage = caclPercentUsage();
216            }
217            setPercentUsage(percentUsage);
218        }
219    
220        public long getUsage() {
221            synchronized (usageMutex) {
222                return retrieveUsage();
223            }
224        }
225    
226        protected void setPercentUsage(int value) {
227            synchronized (usageMutex) {
228                int oldValue = percentUsage;
229                percentUsage = value;
230                if (oldValue != value) {
231                    fireEvent(oldValue, value);
232                }
233            }
234        }
235    
236        protected int caclPercentUsage() {
237            if (limiter.getLimit() == 0) {
238                return 0;
239            }
240            return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
241        }
242    
243        private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
244            if (debug) {
245                LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " 
246                    + newPercentUsage + "% of available memory");
247            }   
248            if (started.get()) {
249                // Switching from being full to not being full..
250                if (oldPercentUsage >= 100 && newPercentUsage < 100) {
251                    synchronized (usageMutex) {
252                        usageMutex.notifyAll();
253                        if (!callbacks.isEmpty()) {
254                            for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
255                                Runnable callback = iter.next();
256                                getExecutor().execute(callback);
257                            }
258                            callbacks.clear();
259                        }
260                    }
261                }
262                if (!listeners.isEmpty()) {
263                    // Let the listeners know on a separate thread
264                    Runnable listenerNotifier = new Runnable() {
265                        public void run() {
266                            for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
267                                UsageListener l = iter.next();
268                                l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
269                            }
270                        }
271                    };
272                    if (started.get()) {
273                        getExecutor().execute(listenerNotifier);
274                    } else {
275                        LOG.warn("Not notifying memory usage change to listeners on shutdown");
276                    }
277                }
278            }
279        }
280    
281        public String getName() {
282            return name;
283        }
284    
285        @Override
286        public String toString() {
287            return "Usage(" + getName() + ") percentUsage=" + percentUsage
288                    + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
289                    + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
290                    + (parent != null ? ";Parent:" + parent.toString() : "");
291        }
292    
293        @SuppressWarnings("unchecked")
294        public void start() {
295            if (started.compareAndSet(false, true)){
296                if (parent != null) {
297                    parent.addChild(this);
298                }
299                for (T t:children) {
300                    t.start();
301                }
302            }
303        }
304    
305        @SuppressWarnings("unchecked")
306        public void stop() {
307            if (started.compareAndSet(true, false)){
308                if (parent != null) {
309                    parent.removeChild(this);
310                }
311                
312                //clear down any callbacks
313                synchronized (usageMutex) {
314                    usageMutex.notifyAll();
315                    for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
316                        Runnable callback = iter.next();
317                        callback.run();
318                    }
319                    this.callbacks.clear();
320                }
321                for (T t:children) {
322                    t.stop();
323                }
324            }
325        }
326    
327        protected void addChild(T child) {
328            children.add(child);
329            if (started.get()) {
330                child.start();
331            }
332        }
333    
334        protected void removeChild(T child) {
335            children.remove(child);
336        }
337    
338        /**
339         * @param callback
340         * @return true if the UsageManager was full. The callback will only be
341         *         called if this method returns true.
342         */
343        public boolean notifyCallbackWhenNotFull(final Runnable callback) {
344            if (parent != null) {
345                Runnable r = new Runnable() {
346    
347                    public void run() {
348                        synchronized (usageMutex) {
349                            if (percentUsage >= 100) {
350                                callbacks.add(callback);
351                            } else {
352                                callback.run();
353                            }
354                        }
355                    }
356                };
357                if (parent.notifyCallbackWhenNotFull(r)) {
358                    return true;
359                }
360            }
361            synchronized (usageMutex) {
362                if (percentUsage >= 100) {
363                    callbacks.add(callback);
364                    return true;
365                } else {
366                    return false;
367                }
368            }
369        }
370    
371        /**
372         * @return the limiter
373         */
374        public UsageCapacity getLimiter() {
375            return this.limiter;
376        }
377    
378        /**
379         * @param limiter the limiter to set
380         */
381        public void setLimiter(UsageCapacity limiter) {
382            this.limiter = limiter;
383        }
384    
385        /**
386         * @return the pollingTime
387         */
388        public int getPollingTime() {
389            return this.pollingTime;
390        }
391    
392        /**
393         * @param pollingTime the pollingTime to set
394         */
395        public void setPollingTime(int pollingTime) {
396            this.pollingTime = pollingTime;
397        }
398    
399        public void setName(String name) {
400            this.name = name;
401        }
402    
403        public T getParent() {
404            return parent;
405        }
406    
407        public void setParent(T parent) {
408            this.parent = parent;
409        }
410        
411        public void setExecutor (ThreadPoolExecutor executor) {
412            this.executor = executor;
413        }
414        public ThreadPoolExecutor getExecutor() {
415            return executor;
416        }
417    }