001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.journal;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    
026    /**
027     * Used to pool DataFileAccessors.
028     *
029     * @author chirino
030     */
031    public class DataFileAccessorPool {
032    
033        private final Journal journal;
034        private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
035        private boolean closed;
036        private int maxOpenReadersPerFile = 5;
037    
038        class Pool {
039    
040            private final DataFile file;
041            private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
042            private boolean used;
043            private int openCounter;
044            private boolean disposed;
045    
046            public Pool(DataFile file) {
047                this.file = file;
048            }
049    
050            public DataFileAccessor openDataFileReader() throws IOException {
051                DataFileAccessor rc = null;
052                if (pool.isEmpty()) {
053                    rc = new DataFileAccessor(journal, file);
054                } else {
055                    rc = pool.remove(pool.size() - 1);
056                }
057                used = true;
058                openCounter++;
059                return rc;
060            }
061    
062            public synchronized void closeDataFileReader(DataFileAccessor reader) {
063                openCounter--;
064                if (pool.size() >= maxOpenReadersPerFile || disposed) {
065                    reader.dispose();
066                } else {
067                    pool.add(reader);
068                }
069            }
070    
071            public synchronized void clearUsedMark() {
072                used = false;
073            }
074    
075            public synchronized boolean isUsed() {
076                return used;
077            }
078    
079            public synchronized void dispose() {
080                for (DataFileAccessor reader : pool) {
081                    reader.dispose();
082                }
083                pool.clear();
084                disposed = true;
085            }
086    
087            public synchronized int getOpenCounter() {
088                return openCounter;
089            }
090    
091        }
092    
093        public DataFileAccessorPool(Journal dataManager) {
094            this.journal = dataManager;
095        }
096    
097        synchronized void clearUsedMark() {
098            for (Pool pool : pools.values()) {
099                pool.clearUsedMark();
100            }
101        }
102    
103        synchronized void disposeUnused() {
104            for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
105                Pool pool = iter.next();
106                if (!pool.isUsed()) {
107                    pool.dispose();
108                    iter.remove();
109                }
110            }
111        }
112    
113        synchronized void disposeDataFileAccessors(DataFile dataFile) {
114            if (closed) {
115                throw new IllegalStateException("Closed.");
116            }
117            Pool pool = pools.get(dataFile.getDataFileId());
118            if (pool != null) {
119                if (pool.getOpenCounter() == 0) {
120                    pool.dispose();
121                    pools.remove(dataFile.getDataFileId());
122                } else {
123                    throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
124                }
125            }
126        }
127    
128        synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
129            if (closed) {
130                throw new IOException("Closed.");
131            }
132    
133            Pool pool = pools.get(dataFile.getDataFileId());
134            if (pool == null) {
135                pool = new Pool(dataFile);
136                pools.put(dataFile.getDataFileId(), pool);
137            }
138            return pool.openDataFileReader();
139        }
140    
141        synchronized void closeDataFileAccessor(DataFileAccessor reader) {
142            Pool pool = pools.get(reader.getDataFile().getDataFileId());
143            if (pool == null || closed) {
144                reader.dispose();
145            } else {
146                pool.closeDataFileReader(reader);
147            }
148        }
149    
150        public synchronized void close() {
151            if (closed) {
152                return;
153            }
154            closed = true;
155            for (Pool pool : pools.values()) {
156                pool.dispose();
157            }
158            pools.clear();
159        }
160    
161    }