001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.usage;
018    
019    import java.util.LinkedList;
020    import java.util.List;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    import java.util.concurrent.ThreadPoolExecutor;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    import java.util.concurrent.locks.Condition;
026    import java.util.concurrent.locks.ReentrantReadWriteLock;
027    
028    import org.apache.activemq.Service;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Used to keep track of how much of something is being used so that a productive working set usage can be controlled.
034     * Main use case is manage memory usage.
035     *
036     * @org.apache.xbean.XBean
037     *
038     */
039    public abstract class Usage<T extends Usage> implements Service {
040    
041        private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
042    
043        protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
044        protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition();
045        protected int percentUsage;
046        protected T parent;
047        protected String name;
048    
049        private UsageCapacity limiter = new DefaultUsageCapacity();
050        private int percentUsageMinDelta = 1;
051        private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
052        private final boolean debug = LOG.isDebugEnabled();
053        private float usagePortion = 1.0f;
054        private final List<T> children = new CopyOnWriteArrayList<T>();
055        private final List<Runnable> callbacks = new LinkedList<Runnable>();
056        private int pollingTime = 100;
057        private final AtomicBoolean started = new AtomicBoolean();
058        private ThreadPoolExecutor executor;
059    
060        public Usage(T parent, String name, float portion) {
061            this.parent = parent;
062            this.usagePortion = portion;
063            if (parent != null) {
064                this.limiter.setLimit((long) (parent.getLimit() * (double)portion));
065                name = parent.name + ":" + name;
066            }
067            this.name = name;
068        }
069    
070        protected abstract long retrieveUsage();
071    
072        /**
073         * @throws InterruptedException
074         */
075        public void waitForSpace() throws InterruptedException {
076            waitForSpace(0);
077        }
078    
079        public boolean waitForSpace(long timeout) throws InterruptedException {
080            return waitForSpace(timeout, 100);
081        }
082    
083        /**
084         * @param timeout
085         * @throws InterruptedException
086         * @return true if space
087         */
088        public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
089            if (parent != null) {
090                if (!parent.waitForSpace(timeout, highWaterMark)) {
091                    return false;
092                }
093            }
094            usageLock.writeLock().lock();
095            try {
096                percentUsage = caclPercentUsage();
097                if (percentUsage >= highWaterMark) {
098                    long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
099                    long timeleft = deadline;
100                    while (timeleft > 0) {
101                        percentUsage = caclPercentUsage();
102                        if (percentUsage >= highWaterMark) {
103                            waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS);
104                            timeleft = deadline - System.currentTimeMillis();
105                        } else {
106                            break;
107                        }
108                    }
109                }
110                return percentUsage < highWaterMark;
111            } finally {
112                usageLock.writeLock().unlock();
113            }
114        }
115    
116        public boolean isFull() {
117            return isFull(100);
118        }
119    
120        public boolean isFull(int highWaterMark) {
121            if (parent != null && parent.isFull(highWaterMark)) {
122                return true;
123            }
124            usageLock.writeLock().lock();
125            try {
126                percentUsage = caclPercentUsage();
127                return percentUsage >= highWaterMark;
128            } finally {
129                usageLock.writeLock().unlock();
130            }
131        }
132    
133        public void addUsageListener(UsageListener listener) {
134            listeners.add(listener);
135        }
136    
137        public void removeUsageListener(UsageListener listener) {
138            listeners.remove(listener);
139        }
140    
141        public long getLimit() {
142            usageLock.readLock().lock();
143            try {
144                return limiter.getLimit();
145            } finally {
146                usageLock.readLock().unlock();
147            }
148        }
149    
150        /**
151         * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager
152         * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and
153         * "1g" can be used
154         *
155         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
156         */
157        public void setLimit(long limit) {
158            if (percentUsageMinDelta < 0) {
159                throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
160            }
161            usageLock.writeLock().lock();
162            try {
163                this.limiter.setLimit(limit);
164                this.usagePortion = 0;
165            } finally {
166                usageLock.writeLock().unlock();
167            }
168            onLimitChange();
169        }
170    
171        protected void onLimitChange() {
172            // We may need to calculate the limit
173            if (usagePortion > 0 && parent != null) {
174                usageLock.writeLock().lock();
175                try {
176                    this.limiter.setLimit((long) (parent.getLimit() * (double) usagePortion));
177                } finally {
178                    usageLock.writeLock().unlock();
179                }
180            }
181            // Reset the percent currently being used.
182            usageLock.writeLock().lock();
183            try {
184                setPercentUsage(caclPercentUsage());
185            } finally {
186                usageLock.writeLock().unlock();
187            }
188            // Let the children know that the limit has changed. They may need to
189            // set their limits based on ours.
190            for (T child : children) {
191                child.onLimitChange();
192            }
193        }
194    
195        public float getUsagePortion() {
196            usageLock.readLock().lock();
197            try {
198                return usagePortion;
199            } finally {
200                usageLock.readLock().unlock();
201            }
202        }
203    
204        public void setUsagePortion(float usagePortion) {
205            usageLock.writeLock().lock();
206            try {
207                this.usagePortion = usagePortion;
208            } finally {
209                usageLock.writeLock().unlock();
210            }
211            onLimitChange();
212        }
213    
214        public int getPercentUsage() {
215            usageLock.readLock().lock();
216            try {
217                return percentUsage;
218            } finally {
219                usageLock.readLock().unlock();
220            }
221        }
222    
223        public int getPercentUsageMinDelta() {
224            usageLock.readLock().lock();
225            try {
226                return percentUsageMinDelta;
227            } finally {
228                usageLock.readLock().unlock();
229            }
230        }
231    
232        /**
233         * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the
234         * manager.
235         *
236         * @param percentUsageMinDelta
237         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
238         */
239        public void setPercentUsageMinDelta(int percentUsageMinDelta) {
240            if (percentUsageMinDelta < 1) {
241                throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
242            }
243    
244            usageLock.writeLock().lock();
245            try {
246                this.percentUsageMinDelta = percentUsageMinDelta;
247                setPercentUsage(caclPercentUsage());
248            } finally {
249                usageLock.writeLock().unlock();
250            }
251        }
252    
253        public long getUsage() {
254            usageLock.readLock().lock();
255            try {
256                return retrieveUsage();
257            } finally {
258                usageLock.readLock().unlock();
259            }
260        }
261    
262        protected void setPercentUsage(int value) {
263            usageLock.writeLock().lock();
264            try {
265                int oldValue = percentUsage;
266                percentUsage = value;
267                if (oldValue != value) {
268                    fireEvent(oldValue, value);
269                }
270            } finally {
271                usageLock.writeLock().unlock();
272            }
273        }
274    
275        protected int caclPercentUsage() {
276            if (limiter.getLimit() == 0) {
277                return 0;
278            }
279            return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
280        }
281    
282        // Must be called with the usage lock's writeLock held.
283        private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
284            if (debug) {
285                LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory");
286            }
287            if (started.get()) {
288                // Switching from being full to not being full..
289                if (oldPercentUsage >= 100 && newPercentUsage < 100) {
290                    waitForSpaceCondition.signalAll();
291                    if (!callbacks.isEmpty()) {
292                        for (Runnable callback : callbacks) {
293                            getExecutor().execute(callback);
294                        }
295                        callbacks.clear();
296                    }
297                }
298                if (!listeners.isEmpty()) {
299                    // Let the listeners know on a separate thread
300                    Runnable listenerNotifier = new Runnable() {
301                        @Override
302                        public void run() {
303                            for (UsageListener listener : listeners) {
304                                listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
305                            }
306                        }
307                    };
308                    if (started.get()) {
309                        getExecutor().execute(listenerNotifier);
310                    } else {
311                        LOG.warn("Not notifying memory usage change to listeners on shutdown");
312                    }
313                }
314            }
315        }
316    
317        public String getName() {
318            return name;
319        }
320    
321        @Override
322        public String toString() {
323            return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
324                + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : "");
325        }
326    
327        @Override
328        @SuppressWarnings("unchecked")
329        public void start() {
330            if (started.compareAndSet(false, true)) {
331                if (parent != null) {
332                    parent.addChild(this);
333                    if (getLimit() > parent.getLimit()) {
334                        LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() });
335                    }
336                }
337                for (T t : children) {
338                    t.start();
339                }
340            }
341        }
342    
343        @Override
344        @SuppressWarnings("unchecked")
345        public void stop() {
346            if (started.compareAndSet(true, false)) {
347                if (parent != null) {
348                    parent.removeChild(this);
349                }
350    
351                // clear down any callbacks
352                usageLock.writeLock().lock();
353                try {
354                    waitForSpaceCondition.signalAll();
355                    for (Runnable callback : this.callbacks) {
356                        callback.run();
357                    }
358                    this.callbacks.clear();
359                } finally {
360                    usageLock.writeLock().unlock();
361                }
362    
363                for (T t : children) {
364                    t.stop();
365                }
366            }
367        }
368    
369        protected void addChild(T child) {
370            children.add(child);
371            if (started.get()) {
372                child.start();
373            }
374        }
375    
376        protected void removeChild(T child) {
377            children.remove(child);
378        }
379    
380        /**
381         * @param callback
382         * @return true if the UsageManager was full. The callback will only be called if this method returns true.
383         */
384        public boolean notifyCallbackWhenNotFull(final Runnable callback) {
385            if (parent != null) {
386                Runnable r = new Runnable() {
387    
388                    @Override
389                    public void run() {
390                        usageLock.writeLock().lock();
391                        try {
392                            if (percentUsage >= 100) {
393                                callbacks.add(callback);
394                            } else {
395                                callback.run();
396                            }
397                        } finally {
398                            usageLock.writeLock().unlock();
399                        }
400                    }
401                };
402                if (parent.notifyCallbackWhenNotFull(r)) {
403                    return true;
404                }
405            }
406            usageLock.writeLock().lock();
407            try {
408                if (percentUsage >= 100) {
409                    callbacks.add(callback);
410                    return true;
411                } else {
412                    return false;
413                }
414            } finally {
415                usageLock.writeLock().unlock();
416            }
417        }
418    
419        /**
420         * @return the limiter
421         */
422        public UsageCapacity getLimiter() {
423            return this.limiter;
424        }
425    
426        /**
427         * @param limiter
428         *            the limiter to set
429         */
430        public void setLimiter(UsageCapacity limiter) {
431            this.limiter = limiter;
432        }
433    
434        /**
435         * @return the pollingTime
436         */
437        public int getPollingTime() {
438            return this.pollingTime;
439        }
440    
441        /**
442         * @param pollingTime
443         *            the pollingTime to set
444         */
445        public void setPollingTime(int pollingTime) {
446            this.pollingTime = pollingTime;
447        }
448    
449        public void setName(String name) {
450            this.name = name;
451        }
452    
453        public T getParent() {
454            return parent;
455        }
456    
457        public void setParent(T parent) {
458            this.parent = parent;
459        }
460    
461        public void setExecutor(ThreadPoolExecutor executor) {
462            this.executor = executor;
463        }
464    
465        public ThreadPoolExecutor getExecutor() {
466            return executor;
467        }
468    
469        public boolean isStarted() {
470            return started.get();
471        }
472    }