001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.usage;
018
019import java.util.LinkedList;
020import java.util.List;
021import java.util.concurrent.CopyOnWriteArrayList;
022import java.util.concurrent.ThreadPoolExecutor;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.locks.Condition;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import org.apache.activemq.Service;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Used to keep track of how much of something is being used so that a productive working set usage can be controlled.
034 * Main use case is manage memory usage.
035 *
036 * @org.apache.xbean.XBean
037 *
038 */
039public abstract class Usage<T extends Usage> implements Service {
040
041    private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
042
043    protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
044    protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition();
045    protected int percentUsage;
046    protected T parent;
047    protected String name;
048
049    private UsageCapacity limiter = new DefaultUsageCapacity();
050    private int percentUsageMinDelta = 1;
051    private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
052    private final boolean debug = LOG.isDebugEnabled();
053    private float usagePortion = 1.0f;
054    private final List<T> children = new CopyOnWriteArrayList<T>();
055    private final List<Runnable> callbacks = new LinkedList<Runnable>();
056    private int pollingTime = 100;
057    private final AtomicBoolean started = new AtomicBoolean();
058    private ThreadPoolExecutor executor;
059
060    public Usage(T parent, String name, float portion) {
061        this.parent = parent;
062        this.usagePortion = portion;
063        if (parent != null) {
064            this.limiter.setLimit((long) (parent.getLimit() * (double)portion));
065            name = parent.name + ":" + name;
066        }
067        this.name = name;
068    }
069
070    protected abstract long retrieveUsage();
071
072    /**
073     * @throws InterruptedException
074     */
075    public void waitForSpace() throws InterruptedException {
076        waitForSpace(0);
077    }
078
079    public boolean waitForSpace(long timeout) throws InterruptedException {
080        return waitForSpace(timeout, 100);
081    }
082
083    /**
084     * @param timeout
085     * @throws InterruptedException
086     * @return true if space
087     */
088    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
089        if (parent != null) {
090            if (!parent.waitForSpace(timeout, highWaterMark)) {
091                return false;
092            }
093        }
094        usageLock.writeLock().lock();
095        try {
096            percentUsage = caclPercentUsage();
097            if (percentUsage >= highWaterMark) {
098                long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
099                long timeleft = deadline;
100                while (timeleft > 0) {
101                    percentUsage = caclPercentUsage();
102                    if (percentUsage >= highWaterMark) {
103                        waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS);
104                        timeleft = deadline - System.currentTimeMillis();
105                    } else {
106                        break;
107                    }
108                }
109            }
110            return percentUsage < highWaterMark;
111        } finally {
112            usageLock.writeLock().unlock();
113        }
114    }
115
116    public boolean isFull() {
117        return isFull(100);
118    }
119
120    public boolean isFull(int highWaterMark) {
121        if (parent != null && parent.isFull(highWaterMark)) {
122            return true;
123        }
124        usageLock.writeLock().lock();
125        try {
126            percentUsage = caclPercentUsage();
127            return percentUsage >= highWaterMark;
128        } finally {
129            usageLock.writeLock().unlock();
130        }
131    }
132
133    public void addUsageListener(UsageListener listener) {
134        listeners.add(listener);
135    }
136
137    public void removeUsageListener(UsageListener listener) {
138        listeners.remove(listener);
139    }
140
141    public int getNumUsageListeners() {
142        return listeners.size();
143    }
144
145    public long getLimit() {
146        usageLock.readLock().lock();
147        try {
148            return limiter.getLimit();
149        } finally {
150            usageLock.readLock().unlock();
151        }
152    }
153
154    /**
155     * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager
156     * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and
157     * "1g" can be used
158     *
159     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
160     */
161    public void setLimit(long limit) {
162        if (percentUsageMinDelta < 0) {
163            throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
164        }
165        usageLock.writeLock().lock();
166        try {
167            this.limiter.setLimit(limit);
168            this.usagePortion = 0;
169        } finally {
170            usageLock.writeLock().unlock();
171        }
172        onLimitChange();
173    }
174
175    protected void onLimitChange() {
176        // We may need to calculate the limit
177        if (usagePortion > 0 && parent != null) {
178            usageLock.writeLock().lock();
179            try {
180                this.limiter.setLimit((long) (parent.getLimit() * (double) usagePortion));
181            } finally {
182                usageLock.writeLock().unlock();
183            }
184        }
185        // Reset the percent currently being used.
186        usageLock.writeLock().lock();
187        try {
188            setPercentUsage(caclPercentUsage());
189        } finally {
190            usageLock.writeLock().unlock();
191        }
192        // Let the children know that the limit has changed. They may need to
193        // set their limits based on ours.
194        for (T child : children) {
195            child.onLimitChange();
196        }
197    }
198
199    public float getUsagePortion() {
200        usageLock.readLock().lock();
201        try {
202            return usagePortion;
203        } finally {
204            usageLock.readLock().unlock();
205        }
206    }
207
208    public void setUsagePortion(float usagePortion) {
209        usageLock.writeLock().lock();
210        try {
211            this.usagePortion = usagePortion;
212        } finally {
213            usageLock.writeLock().unlock();
214        }
215        onLimitChange();
216    }
217
218    public int getPercentUsage() {
219        usageLock.readLock().lock();
220        try {
221            return percentUsage;
222        } finally {
223            usageLock.readLock().unlock();
224        }
225    }
226
227    public int getPercentUsageMinDelta() {
228        usageLock.readLock().lock();
229        try {
230            return percentUsageMinDelta;
231        } finally {
232            usageLock.readLock().unlock();
233        }
234    }
235
236    /**
237     * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the
238     * manager.
239     *
240     * @param percentUsageMinDelta
241     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
242     */
243    public void setPercentUsageMinDelta(int percentUsageMinDelta) {
244        if (percentUsageMinDelta < 1) {
245            throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
246        }
247
248        usageLock.writeLock().lock();
249        try {
250            this.percentUsageMinDelta = percentUsageMinDelta;
251            setPercentUsage(caclPercentUsage());
252        } finally {
253            usageLock.writeLock().unlock();
254        }
255    }
256
257    public long getUsage() {
258        usageLock.readLock().lock();
259        try {
260            return retrieveUsage();
261        } finally {
262            usageLock.readLock().unlock();
263        }
264    }
265
266    protected void setPercentUsage(int value) {
267        usageLock.writeLock().lock();
268        try {
269            int oldValue = percentUsage;
270            percentUsage = value;
271            if (oldValue != value) {
272                fireEvent(oldValue, value);
273            }
274        } finally {
275            usageLock.writeLock().unlock();
276        }
277    }
278
279    protected int caclPercentUsage() {
280        if (limiter.getLimit() == 0) {
281            return 0;
282        }
283        return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
284    }
285
286    // Must be called with the usage lock's writeLock held.
287    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
288        if (debug) {
289            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory");
290        }
291        if (started.get()) {
292            // Switching from being full to not being full..
293            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
294                waitForSpaceCondition.signalAll();
295                if (!callbacks.isEmpty()) {
296                    for (Runnable callback : callbacks) {
297                        getExecutor().execute(callback);
298                    }
299                    callbacks.clear();
300                }
301            }
302            if (!listeners.isEmpty()) {
303                // Let the listeners know on a separate thread
304                Runnable listenerNotifier = new Runnable() {
305                    @Override
306                    public void run() {
307                        for (UsageListener listener : listeners) {
308                            listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
309                        }
310                    }
311                };
312                if (started.get()) {
313                    getExecutor().execute(listenerNotifier);
314                } else {
315                    LOG.warn("Not notifying memory usage change to listeners on shutdown");
316                }
317            }
318        }
319    }
320
321    public String getName() {
322        return name;
323    }
324
325    @Override
326    public String toString() {
327        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
328            + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : "");
329    }
330
331    @Override
332    @SuppressWarnings("unchecked")
333    public void start() {
334        if (started.compareAndSet(false, true)) {
335            if (parent != null) {
336                parent.addChild(this);
337                if (getLimit() > parent.getLimit()) {
338                    LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() });
339                }
340            }
341            for (T t : children) {
342                t.start();
343            }
344        }
345    }
346
347    @Override
348    @SuppressWarnings("unchecked")
349    public void stop() {
350        if (started.compareAndSet(true, false)) {
351            if (parent != null) {
352                parent.removeChild(this);
353            }
354
355            // clear down any callbacks
356            usageLock.writeLock().lock();
357            try {
358                waitForSpaceCondition.signalAll();
359                for (Runnable callback : this.callbacks) {
360                    callback.run();
361                }
362                this.callbacks.clear();
363            } finally {
364                usageLock.writeLock().unlock();
365            }
366
367            for (T t : children) {
368                t.stop();
369            }
370        }
371    }
372
373    protected void addChild(T child) {
374        children.add(child);
375        if (started.get()) {
376            child.start();
377        }
378    }
379
380    protected void removeChild(T child) {
381        children.remove(child);
382    }
383
384    /**
385     * @param callback
386     * @return true if the UsageManager was full. The callback will only be called if this method returns true.
387     */
388    public boolean notifyCallbackWhenNotFull(final Runnable callback) {
389        if (parent != null) {
390            Runnable r = new Runnable() {
391
392                @Override
393                public void run() {
394                    usageLock.writeLock().lock();
395                    try {
396                        if (percentUsage >= 100) {
397                            callbacks.add(callback);
398                        } else {
399                            callback.run();
400                        }
401                    } finally {
402                        usageLock.writeLock().unlock();
403                    }
404                }
405            };
406            if (parent.notifyCallbackWhenNotFull(r)) {
407                return true;
408            }
409        }
410        usageLock.writeLock().lock();
411        try {
412            if (percentUsage >= 100) {
413                callbacks.add(callback);
414                return true;
415            } else {
416                return false;
417            }
418        } finally {
419            usageLock.writeLock().unlock();
420        }
421    }
422
423    /**
424     * @return the limiter
425     */
426    public UsageCapacity getLimiter() {
427        return this.limiter;
428    }
429
430    /**
431     * @param limiter
432     *            the limiter to set
433     */
434    public void setLimiter(UsageCapacity limiter) {
435        this.limiter = limiter;
436    }
437
438    /**
439     * @return the pollingTime
440     */
441    public int getPollingTime() {
442        return this.pollingTime;
443    }
444
445    /**
446     * @param pollingTime
447     *            the pollingTime to set
448     */
449    public void setPollingTime(int pollingTime) {
450        this.pollingTime = pollingTime;
451    }
452
453    public void setName(String name) {
454        this.name = name;
455    }
456
457    public T getParent() {
458        return parent;
459    }
460
461    public void setParent(T parent) {
462        this.parent = parent;
463    }
464
465    public void setExecutor(ThreadPoolExecutor executor) {
466        this.executor = executor;
467    }
468
469    public ThreadPoolExecutor getExecutor() {
470        return executor;
471    }
472
473    public boolean isStarted() {
474        return started.get();
475    }
476}