001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.async;
018    
019    import java.io.File;
020    import java.io.FilenameFilter;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.List;
025    
026    import org.apache.activemq.thread.Scheduler;
027    import org.apache.activemq.util.ByteSequence;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * An AsyncDataManager that works in read only mode against multiple data directories.
033     * Useful for reading back archived data files.
034     */
035    public class ReadOnlyAsyncDataManager extends AsyncDataManager {
036        
037        private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyAsyncDataManager.class);
038        private final ArrayList<File> dirs;
039    
040        public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) {
041            this.dirs = dirs;
042        }
043    
044        @SuppressWarnings("unchecked")
045        public synchronized void start() throws IOException {
046            if (started) {
047                return;
048            }
049    
050            started = true;
051    
052            accessorPool = new DataFileAccessorPool(this);
053                    
054            ArrayList<File> files = new ArrayList<File>();
055            for (File directory : dirs) {
056                final File d = directory;
057                File[] f = d.listFiles(new FilenameFilter() {
058                    public boolean accept(File dir, String n) {
059                        return dir.equals(d) && n.startsWith(filePrefix);
060                    }
061                });
062                for (int i = 0; i < f.length; i++) {
063                    files.add(f[i]);
064                }
065            }
066           
067            for (File file : files) {
068                try {
069                    String n = file.getName();
070                    String numStr = n.substring(filePrefix.length(), n.length());
071                    int num = Integer.parseInt(numStr);
072                    DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
073                    fileMap.put(dataFile.getDataFileId(), dataFile);
074                    storeSize.addAndGet(dataFile.getLength());
075                } catch (NumberFormatException e) {
076                    // Ignore file that do not match the pattern.
077                }
078            }
079    
080            // Sort the list so that we can link the DataFiles together in the
081            // right order.
082            List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
083            Collections.sort(dataFiles);
084            currentWriteFile = null;
085            for (DataFile df : dataFiles) {
086                if (currentWriteFile != null) {
087                    currentWriteFile.linkAfter(df);
088                }
089                currentWriteFile = df;
090                fileByFileMap.put(df.getFile(), df);
091            }
092            
093            // Need to check the current Write File to see if there was a partial
094            // write to it.
095            if (currentWriteFile != null) {
096    
097                // See if the lastSyncedLocation is valid..
098                Location l = lastAppendLocation.get();
099                if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
100                    l = null;
101                }
102            }
103        }
104        
105        public synchronized void close() throws IOException {
106            if (!started) {
107                return;
108            }
109            accessorPool.close();
110            fileMap.clear();
111            fileByFileMap.clear();
112            started = false;
113        }
114    
115        
116        public Location getFirstLocation() throws IllegalStateException, IOException {
117            if( currentWriteFile == null ) {
118                return null;
119            }
120            
121            DataFile first = (DataFile)currentWriteFile.getHeadNode();
122            Location cur = new Location();
123            cur.setDataFileId(first.getDataFileId());
124            cur.setOffset(0);
125            cur.setSize(0);
126            return getNextLocation(cur);
127        }
128        
129        @Override
130        public synchronized boolean delete() throws IOException {
131            throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
132        }    
133    }