001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.async;
018    
019    import java.io.File;
020    import java.io.FilenameFilter;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.List;
025    
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * An AsyncDataManager that works in read only mode against multiple data directories.
031     * Useful for reading back archived data files.
032     */
033    public class ReadOnlyAsyncDataManager extends AsyncDataManager {
034        
035        private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyAsyncDataManager.class);
036        private final ArrayList<File> dirs;
037    
038        public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) {
039            this.dirs = dirs;
040        }
041    
042        @SuppressWarnings("unchecked")
043        public synchronized void start() throws IOException {
044            if (started) {
045                return;
046            }
047    
048            started = true;
049    
050            accessorPool = new DataFileAccessorPool(this);
051                    
052            ArrayList<File> files = new ArrayList<File>();
053            for (File directory : dirs) {
054                final File d = directory;
055                File[] f = d.listFiles(new FilenameFilter() {
056                    public boolean accept(File dir, String n) {
057                        return dir.equals(d) && n.startsWith(filePrefix);
058                    }
059                });
060                for (int i = 0; i < f.length; i++) {
061                    files.add(f[i]);
062                }
063            }
064           
065            for (File file : files) {
066                try {
067                    String n = file.getName();
068                    String numStr = n.substring(filePrefix.length(), n.length());
069                    int num = Integer.parseInt(numStr);
070                    DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
071                    fileMap.put(dataFile.getDataFileId(), dataFile);
072                    storeSize.addAndGet(dataFile.getLength());
073                } catch (NumberFormatException e) {
074                    // Ignore file that do not match the pattern.
075                }
076            }
077    
078            // Sort the list so that we can link the DataFiles together in the
079            // right order.
080            List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
081            Collections.sort(dataFiles);
082            currentWriteFile = null;
083            for (DataFile df : dataFiles) {
084                if (currentWriteFile != null) {
085                    currentWriteFile.linkAfter(df);
086                }
087                currentWriteFile = df;
088                fileByFileMap.put(df.getFile(), df);
089            }
090            
091            // Need to check the current Write File to see if there was a partial
092            // write to it.
093            if (currentWriteFile != null) {
094    
095                // See if the lastSyncedLocation is valid..
096                Location l = lastAppendLocation.get();
097                if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
098                    l = null;
099                }
100            }
101        }
102        
103        public synchronized void close() throws IOException {
104            if (!started) {
105                return;
106            }
107            accessorPool.close();
108            fileMap.clear();
109            fileByFileMap.clear();
110            started = false;
111        }
112    
113        
114        public Location getFirstLocation() throws IllegalStateException, IOException {
115            if( currentWriteFile == null ) {
116                return null;
117            }
118            
119            DataFile first = (DataFile)currentWriteFile.getHeadNode();
120            Location cur = new Location();
121            cur.setDataFileId(first.getDataFileId());
122            cur.setOffset(0);
123            cur.setSize(0);
124            return getNextLocation(cur);
125        }
126        
127        @Override
128        public synchronized boolean delete() throws IOException {
129            throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
130        }    
131    }