001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq.reader;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.Set;
024    import javax.jms.InvalidSelectorException;
025    import javax.jms.Message;
026    
027    import org.apache.activemq.command.DataStructure;
028    import org.apache.activemq.filter.BooleanExpression;
029    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
030    import org.apache.activemq.kaha.impl.async.Location;
031    import org.apache.activemq.openwire.OpenWireFormat;
032    import org.apache.activemq.selector.SelectorParser;
033    import org.apache.activemq.util.ByteSequence;
034    import org.apache.activemq.wireformat.WireFormat;
035    
036    /**
037     * Reads and iterates through data log files for the AMQMessage Store
038     * 
039     */
040    public class AMQReader implements Iterable<Message> {
041    
042        private AsyncDataManager dataManager;
043        private WireFormat wireFormat = new OpenWireFormat();
044        private File file;
045        private BooleanExpression expression;
046    
047        /**
048         * List all the data files in a directory
049         * @param directory
050         * @return
051         * @throws IOException
052         */
053        public static Set<File> listDataFiles(File directory) throws IOException{
054            Set<File>result = new HashSet<File>();
055            if (directory == null || !directory.exists() || !directory.isDirectory()) {
056                throw new IOException("Invalid Directory " + directory);
057            }
058            AsyncDataManager dataManager = new AsyncDataManager();
059            dataManager.setDirectory(directory);
060            dataManager.start();
061            Set<File> set = dataManager.getFiles();
062            if (set != null) {
063                result.addAll(set);
064            }
065            dataManager.close();
066            return result;
067        }
068        /**
069         * Create the AMQReader to read a directory of amq data logs - or an
070         * individual data log file
071         * 
072         * @param file the directory - or file
073         * @throws IOException 
074         * @throws InvalidSelectorException 
075         * @throws IOException
076         * @throws InvalidSelectorException 
077         */
078        public AMQReader(File file) throws InvalidSelectorException, IOException {
079            this(file,null);
080        }
081        
082        /**
083         * Create the AMQReader to read a directory of amq data logs - or an
084         * individual data log file
085         * 
086         * @param file the directory - or file
087         * @param selector the JMS selector or null to select all
088         * @throws IOException
089         * @throws InvalidSelectorException 
090         */
091        public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
092            String str = selector != null ? selector.trim() : null;
093            if (str != null && str.length() > 0) {
094                this.expression=SelectorParser.parse(str);
095            }
096            dataManager = new AsyncDataManager();
097            dataManager.setArchiveDataLogs(false);
098            if (file.isDirectory()) {
099                dataManager.setDirectory(file);
100            } else {
101                dataManager.setDirectory(file.getParentFile());
102                dataManager.setDirectoryArchive(file);
103                this.file = file;
104            }
105            dataManager.start();
106        }
107    
108        public Iterator<Message> iterator() {
109            return new AMQIterator(this,this.expression);
110        }
111    
112        
113        protected MessageLocation getNextMessage(MessageLocation lastLocation)
114                throws IllegalStateException, IOException {
115            if (this.file != null) {
116                return getInternalNextMessage(this.file, lastLocation);
117            }
118            return getInternalNextMessage(lastLocation);
119        }
120    
121        private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
122                throws IllegalStateException, IOException {
123            return getInternalNextMessage(null, lastLocation);
124        }
125    
126        private MessageLocation getInternalNextMessage(File file,
127                MessageLocation lastLocation) throws IllegalStateException,
128                IOException {
129            MessageLocation result = lastLocation;
130            if (result != null) {
131                result.setMessage(null);
132            }
133            Message message = null;
134            Location pos = lastLocation != null ? lastLocation.getLocation() : null;
135            while ((pos = getNextLocation(file, pos)) != null) {
136                message = getMessage(pos);
137                if (message != null) {
138                    if (result == null) {
139                        result = new MessageLocation();
140                    }
141                    result.setMessage(message);
142                    break;
143                }
144            }
145            result.setLocation(pos);
146            if (pos == null && message == null) {
147                result = null;
148            } else {
149                result.setLocation(pos);
150            }
151            return result;
152        }
153    
154        private Location getNextLocation(File file, Location last)
155                throws IllegalStateException, IOException {
156            if (file != null) {
157                return dataManager.getNextLocation(file, last, true);
158            }
159            return dataManager.getNextLocation(last);
160        }
161    
162        private Message getMessage(Location location) throws IOException {
163            ByteSequence data = dataManager.read(location);
164            DataStructure c = (DataStructure) wireFormat.unmarshal(data);
165            if (c instanceof Message) {
166                return (Message) c;
167            }
168            return null;
169    
170        }
171    }