001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.data;
018    
019    import java.io.File;
020    import java.io.FilenameFilter;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.concurrent.atomic.AtomicLong;
028    
029    import org.apache.activemq.kaha.Marshaller;
030    import org.apache.activemq.kaha.StoreLocation;
031    import org.apache.activemq.kaha.impl.DataManager;
032    import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.util.IOHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Manages DataFiles
040     * 
041     * 
042     */
043    public final class DataManagerImpl implements DataManager {
044    
045        public static final int ITEM_HEAD_SIZE = 5; // type + length
046        public static final byte DATA_ITEM_TYPE = 1;
047        public static final byte REDO_ITEM_TYPE = 2;
048        public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
049        
050        private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class);
051        private static final String NAME_PREFIX = "data-";
052        
053        private final File directory;
054        private final String name;
055        private SyncDataFileReader reader;
056        private SyncDataFileWriter writer;
057        private DataFile currentWriteFile;
058        private long maxFileLength = MAX_FILE_LENGTH;
059        private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
060        private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
061        private String dataFilePrefix;
062        private final AtomicLong storeSize;
063    
064        public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
065            this.directory = dir;
066            this.name = name;
067            this.storeSize=storeSize;
068    
069            dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
070            // build up list of current dataFiles
071            File[] files = dir.listFiles(new FilenameFilter() {
072                public boolean accept(File dir, String n) {
073                    return dir.equals(directory) && n.startsWith(dataFilePrefix);
074                }
075            });
076            if (files != null) {
077                for (int i = 0; i < files.length; i++) {
078                    File file = files[i];
079                    String n = file.getName();
080                    String numStr = n.substring(dataFilePrefix.length(), n.length());
081                    int num = Integer.parseInt(numStr);
082                    DataFile dataFile = new DataFile(file, num);
083                    storeSize.addAndGet(dataFile.getLength());
084                    fileMap.put(dataFile.getNumber(), dataFile);
085                    if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
086                        currentWriteFile = dataFile;
087                    }
088                }
089            }
090        }
091    
092        private DataFile createAndAddDataFile(int num) {
093            String fileName = dataFilePrefix + num;
094            File file = new File(directory, fileName);
095            DataFile result = new DataFile(file, num);
096            fileMap.put(result.getNumber(), result);
097            return result;
098        }
099    
100        /*
101         * (non-Javadoc)
102         * 
103         * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
104         */
105        public String getName() {
106            return name;
107        }
108    
109        synchronized DataFile findSpaceForData(DataItem item) throws IOException {
110            if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
111                int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
112                if (currentWriteFile != null && currentWriteFile.isUnused()) {
113                    removeDataFile(currentWriteFile);
114                }
115                currentWriteFile = createAndAddDataFile(nextNum);
116            }
117            item.setOffset(currentWriteFile.getLength());
118            item.setFile(currentWriteFile.getNumber().intValue());
119            int len = item.getSize() + ITEM_HEAD_SIZE;
120            currentWriteFile.incrementLength(len);
121            storeSize.addAndGet(len);
122            return currentWriteFile;
123        }
124    
125        DataFile getDataFile(StoreLocation item) throws IOException {
126            Integer key = Integer.valueOf(item.getFile());
127            DataFile dataFile = fileMap.get(key);
128            if (dataFile == null) {
129                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
130                throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
131            }
132            return dataFile;
133        }
134    
135        /*
136         * (non-Javadoc)
137         * 
138         * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
139         *      org.apache.activemq.kaha.StoreLocation)
140         */
141        public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
142            return getReader().readItem(marshaller, item);
143        }
144    
145        /*
146         * (non-Javadoc)
147         * 
148         * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
149         *      java.lang.Object)
150         */
151        public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
152            return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
153        }
154    
155        /*
156         * (non-Javadoc)
157         * 
158         * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
159         */
160        public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
161            return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
162        }
163    
164        /*
165         * (non-Javadoc)
166         * 
167         * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
168         *      org.apache.activemq.kaha.Marshaller, java.lang.Object)
169         */
170        public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
171            throws IOException {
172            getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
173        }
174    
175        /*
176         * (non-Javadoc)
177         * 
178         * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
179         */
180        public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
181    
182            // Nothing to recover if there is no current file.
183            if (currentWriteFile == null) {
184                return;
185            }
186    
187            DataItem item = new DataItem();
188            item.setFile(currentWriteFile.getNumber().intValue());
189            item.setOffset(0);
190            while (true) {
191                byte type;
192                try {
193                    type = getReader().readDataItemSize(item);
194                } catch (IOException ignore) {
195                    LOG.trace("End of data file reached at (header was invalid): " + item);
196                    return;
197                }
198                if (type == REDO_ITEM_TYPE) {
199                    // Un-marshal the redo item
200                    Object object;
201                    try {
202                        object = readItem(redoMarshaller, item);
203                    } catch (IOException e1) {
204                        LOG.trace("End of data file reached at (payload was invalid): " + item);
205                        return;
206                    }
207                    try {
208    
209                        listener.onRedoItem(item, object);
210                        // in case the listener is holding on to item references,
211                        // copy it
212                        // so we don't change it behind the listener's back.
213                        item = item.copy();
214    
215                    } catch (Exception e) {
216                        throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
217                    }
218                }
219                // Move to the next item.
220                item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
221            }
222        }
223    
224        /*
225         * (non-Javadoc)
226         * 
227         * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
228         */
229        public synchronized void close() throws IOException {
230            getWriter().close();
231            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
232                DataFile dataFile = i.next();
233                getWriter().force(dataFile);
234                dataFile.close();
235            }
236            fileMap.clear();
237        }
238    
239        /*
240         * (non-Javadoc)
241         * 
242         * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
243         */
244        public synchronized void force() throws IOException {
245            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
246                DataFile dataFile = i.next();
247                getWriter().force(dataFile);
248            }
249        }
250    
251        /*
252         * (non-Javadoc)
253         * 
254         * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
255         */
256        public synchronized boolean delete() throws IOException {
257            boolean result = true;
258            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
259                DataFile dataFile = i.next();
260                storeSize.addAndGet(-dataFile.getLength());
261                result &= dataFile.delete();
262            }
263            fileMap.clear();
264            return result;
265        }
266    
267        /*
268         * (non-Javadoc)
269         * 
270         * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
271         */
272        public synchronized void addInterestInFile(int file) throws IOException {
273            if (file >= 0) {
274                Integer key = Integer.valueOf(file);
275                DataFile dataFile = fileMap.get(key);
276                if (dataFile == null) {
277                    dataFile = createAndAddDataFile(file);
278                }
279                addInterestInFile(dataFile);
280            }
281        }
282    
283        synchronized void addInterestInFile(DataFile dataFile) {
284            if (dataFile != null) {
285                dataFile.increment();
286            }
287        }
288    
289        /*
290         * (non-Javadoc)
291         * 
292         * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
293         */
294        public synchronized void removeInterestInFile(int file) throws IOException {
295            if (file >= 0) {
296                Integer key = Integer.valueOf(file);
297                DataFile dataFile = fileMap.get(key);
298                removeInterestInFile(dataFile);
299            }
300        }
301    
302        synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
303            if (dataFile != null) {
304               
305                if (dataFile.decrement() <= 0) {
306                    if (dataFile != currentWriteFile) {
307                        removeDataFile(dataFile);
308                    }
309                }
310            }
311        }
312    
313        /*
314         * (non-Javadoc)
315         * 
316         * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
317         */
318        public synchronized void consolidateDataFiles() throws IOException {
319            List<DataFile> purgeList = new ArrayList<DataFile>();
320            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
321                DataFile dataFile = i.next();
322                if (dataFile.isUnused() && dataFile != currentWriteFile) {
323                    purgeList.add(dataFile);
324                }
325            }
326            for (int i = 0; i < purgeList.size(); i++) {
327                DataFile dataFile = purgeList.get(i);
328                removeDataFile(dataFile);
329            }
330        }
331    
332        private void removeDataFile(DataFile dataFile) throws IOException {
333            fileMap.remove(dataFile.getNumber());
334            if (writer != null) {
335                writer.force(dataFile);
336            }
337            storeSize.addAndGet(-dataFile.getLength());
338            boolean result = dataFile.delete();
339            LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
340        }
341    
342        /*
343         * (non-Javadoc)
344         * 
345         * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
346         */
347        public Marshaller getRedoMarshaller() {
348            return redoMarshaller;
349        }
350    
351        /*
352         * (non-Javadoc)
353         * 
354         * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
355         */
356        public void setRedoMarshaller(Marshaller redoMarshaller) {
357            this.redoMarshaller = redoMarshaller;
358        }
359    
360        /**
361         * @return the maxFileLength
362         */
363        public long getMaxFileLength() {
364            return maxFileLength;
365        }
366    
367        /**
368         * @param maxFileLength the maxFileLength to set
369         */
370        public void setMaxFileLength(long maxFileLength) {
371            this.maxFileLength = maxFileLength;
372        }
373    
374        public String toString() {
375            return "DataManager:(" + NAME_PREFIX + name + ")";
376        }
377    
378        public synchronized SyncDataFileReader getReader() {
379            if (reader == null) {
380                reader = createReader();
381            }
382            return reader;
383        }
384    
385        protected synchronized SyncDataFileReader createReader() {
386            return new SyncDataFileReader(this);
387        }
388    
389        public synchronized void setReader(SyncDataFileReader reader) {
390            this.reader = reader;
391        }
392    
393        public synchronized SyncDataFileWriter getWriter() {
394            if (writer == null) {
395                writer = createWriter();
396            }
397            return writer;
398        }
399    
400        private SyncDataFileWriter createWriter() {
401            return new SyncDataFileWriter(this);
402        }
403    
404        public synchronized void setWriter(SyncDataFileWriter writer) {
405            this.writer = writer;
406        }
407    
408    }