001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.async;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    
026    /**
027     * Used to pool DataFileAccessors.
028     * 
029     * @author chirino
030     */
031    public class DataFileAccessorPool {
032    
033        private final AsyncDataManager dataManager;
034        private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
035        private boolean closed;
036        private int maxOpenReadersPerFile = 5;
037    
038        class Pool {
039    
040            private final DataFile file;
041            private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
042            private boolean used;
043            private int openCounter;
044            private boolean disposed;
045    
046            public Pool(DataFile file) {
047                this.file = file;
048            }
049    
050            public DataFileAccessor openDataFileReader() throws IOException {
051                DataFileAccessor rc = null;
052                if (pool.isEmpty()) {
053                    rc = new DataFileAccessor(dataManager, file);
054                } else {
055                    rc = (DataFileAccessor)pool.remove(pool.size() - 1);
056                }
057                used = true;
058                openCounter++;
059                return rc;
060            }
061    
062            public synchronized void closeDataFileReader(DataFileAccessor reader) {
063                openCounter--;
064                if (pool.size() >= maxOpenReadersPerFile || disposed) {
065                    reader.dispose();
066                } else {
067                    pool.add(reader);
068                }
069            }
070    
071            public synchronized void clearUsedMark() {
072                used = false;
073            }
074    
075            public synchronized boolean isUsed() {
076                return used;
077            }
078    
079            public synchronized void dispose() {
080                for (DataFileAccessor reader : pool) {
081                    reader.dispose();
082                }
083                pool.clear();
084                disposed = true;
085            }
086    
087            public synchronized int getOpenCounter() {
088                return openCounter;
089            }
090    
091        }
092    
093        public DataFileAccessorPool(AsyncDataManager dataManager) {
094            this.dataManager = dataManager;
095        }
096    
097        synchronized void clearUsedMark() {
098            for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
099                Pool pool = (Pool)iter.next();
100                pool.clearUsedMark();
101            }
102        }
103    
104        synchronized void disposeUnused() {
105            for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
106                Pool pool = iter.next();
107                if (!pool.isUsed()) {
108                    pool.dispose();
109                    iter.remove();
110                }
111            }
112        }
113    
114        synchronized void disposeDataFileAccessors(DataFile dataFile) {
115            if (closed) {
116                throw new IllegalStateException("Closed.");
117            }
118            Pool pool = pools.get(dataFile.getDataFileId());
119            if (pool != null) {
120                if (pool.getOpenCounter() == 0) {
121                    pool.dispose();
122                    pools.remove(dataFile.getDataFileId());
123                } else {
124                    throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
125                }
126            }
127        }
128    
129        synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
130            if (closed) {
131                throw new IOException("Closed.");
132            }
133    
134            Pool pool = pools.get(dataFile.getDataFileId());
135            if (pool == null) {
136                pool = new Pool(dataFile);
137                pools.put(dataFile.getDataFileId(), pool);
138            }
139            return pool.openDataFileReader();
140        }
141    
142        synchronized void closeDataFileAccessor(DataFileAccessor reader) {
143            Pool pool = pools.get(reader.getDataFile().getDataFileId());
144            if (pool == null || closed) {
145                reader.dispose();
146            } else {
147                pool.closeDataFileReader(reader);
148            }
149        }
150    
151        public synchronized void close() {
152            if (closed) {
153                return;
154            }
155            closed = true;
156            for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
157                Pool pool = iter.next();
158                pool.dispose();
159            }
160            pools.clear();
161        }
162    
163    }