001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.data;
018    
019    import java.io.IOException;
020    import java.io.RandomAccessFile;
021    import org.apache.activemq.kaha.Marshaller;
022    import org.apache.activemq.kaha.StoreLocation;
023    import org.apache.activemq.util.DataByteArrayInputStream;
024    
025    /**
026     * Optimized Store reader
027     * 
028     * 
029     */
030    public final class SyncDataFileReader {
031    
032        private DataManagerImpl dataManager;
033        private DataByteArrayInputStream dataIn;
034    
035        /**
036         * Construct a Store reader
037         * 
038         * @param fileId
039         */
040        SyncDataFileReader(DataManagerImpl fileManager) {
041            this.dataManager = fileManager;
042            this.dataIn = new DataByteArrayInputStream();
043        }
044    
045        /*
046         * (non-Javadoc)
047         * 
048         * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
049         */
050        public synchronized byte readDataItemSize(DataItem item) throws IOException {
051            RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
052            file.seek(item.getOffset()); // jump to the size field
053            byte rc = file.readByte();
054            item.setSize(file.readInt());
055            return rc;
056        }
057    
058        /*
059         * (non-Javadoc)
060         * 
061         * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
062         *      org.apache.activemq.kaha.StoreLocation)
063         */
064        public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
065            RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
066    
067            // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
068            // allocating byte[] arrays on every readItem.
069            byte[] data = new byte[item.getSize()];
070            file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE);
071            file.readFully(data);
072            dataIn.restart(data);
073            return marshaller.readPayload(dataIn);
074        }
075    }