001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.data;
018    
019    import java.io.IOException;
020    import java.io.RandomAccessFile;
021    
022    import org.apache.activemq.kaha.Marshaller;
023    import org.apache.activemq.util.DataByteArrayOutputStream;
024    
025    /**
026     * Optimized Store writer. Synchronously marshalls and writes to the data file.
027     * Simple but may introduce a bit of contention when put under load.
028     * 
029     * 
030     */
031    public final class SyncDataFileWriter {
032    
033        private DataByteArrayOutputStream buffer;
034        private DataManagerImpl dataManager;
035    
036        /**
037         * Construct a Store writer
038         * 
039         * @param fileId
040         */
041        SyncDataFileWriter(DataManagerImpl fileManager) {
042            this.dataManager = fileManager;
043            this.buffer = new DataByteArrayOutputStream();
044        }
045    
046        /*
047         * (non-Javadoc)
048         * 
049         * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
050         *      java.lang.Object, byte)
051         */
052        public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
053            throws IOException {
054    
055            // Write the packet our internal buffer.
056            buffer.reset();
057            buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
058            marshaller.writePayload(payload, buffer);
059            int size = buffer.size();
060            int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
061            buffer.reset();
062            buffer.writeByte(type);
063            buffer.writeInt(payloadSize);
064    
065            // Find the position where this item will land at.
066            DataItem item = new DataItem();
067            item.setSize(payloadSize);
068            DataFile dataFile = dataManager.findSpaceForData(item);
069    
070            // Now splat the buffer to the file.
071            dataFile.getRandomAccessFile().seek(item.getOffset());
072            dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
073            dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
074    
075            dataManager.addInterestInFile(dataFile);
076            return item;
077        }
078    
079        /*
080         * (non-Javadoc)
081         * 
082         * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
083         *      org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
084         */
085        public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
086            throws IOException {
087            // Write the packet our internal buffer.
088            buffer.reset();
089            buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
090            marshaller.writePayload(payload, buffer);
091            int size = buffer.size();
092            int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
093            buffer.reset();
094            buffer.writeByte(type);
095            buffer.writeInt(payloadSize);
096            item.setSize(payloadSize);
097            DataFile dataFile = dataManager.getDataFile(item);
098            RandomAccessFile file = dataFile.getRandomAccessFile();
099            file.seek(item.getOffset());
100            file.write(buffer.getData(), 0, size);
101            dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
102        }
103    
104        public synchronized void force(DataFile dataFile) throws IOException {
105            // If our dirty marker was set.. then we need to sync
106            if (dataFile.getWriterData() != null && dataFile.isDirty()) {
107                dataFile.getRandomAccessFile().getFD().sync();
108                dataFile.setWriterData(null);
109                dataFile.setDirty(false);
110            }
111        }
112    
113        public void close() throws IOException {
114        }
115    }