001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.command.store.amq.reader;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.Set;
024    
025    import javax.jms.InvalidSelectorException;
026    import javax.jms.Message;
027    
028    import org.apache.activemq.command.DataStructure;
029    import org.apache.activemq.filter.BooleanExpression;
030    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
031    import org.apache.activemq.kaha.impl.async.Location;
032    import org.apache.activemq.openwire.OpenWireFormat;
033    import org.apache.activemq.selector.SelectorParser;
034    import org.apache.activemq.util.ByteSequence;
035    import org.apache.activemq.wireformat.WireFormat;
036    
037    /**
038     * Reads and iterates through data log files for the AMQMessage Store
039     *
040     */
041    public class AMQReader implements Iterable<Message> {
042    
043        private AsyncDataManager dataManager;
044        private WireFormat wireFormat = new OpenWireFormat();
045        private File file;
046        private BooleanExpression expression;
047    
048        /**
049         * List all the data files in a directory
050         * @param directory
051         * @return result
052         * @throws IOException
053         */
054        public static Set<File> listDataFiles(File directory) throws IOException{
055            Set<File>result = new HashSet<File>();
056            if (directory == null || !directory.exists() || !directory.isDirectory()) {
057                throw new IOException("Invalid Directory " + directory);
058            }
059            AsyncDataManager dataManager = new AsyncDataManager();
060            dataManager.setDirectory(directory);
061            dataManager.start();
062            Set<File> set = dataManager.getFiles();
063            if (set != null) {
064                result.addAll(set);
065            }
066            dataManager.close();
067            return result;
068        }
069        /**
070         * Create the AMQReader to read a directory of amq data logs - or an
071         * individual data log file
072         *
073         * @param file the directory - or file
074         * @throws IOException
075         * @throws InvalidSelectorException
076         * @throws IOException
077         * @throws InvalidSelectorException
078         */
079        public AMQReader(File file) throws InvalidSelectorException, IOException {
080            this(file,null);
081        }
082    
083        /**
084         * Create the AMQReader to read a directory of amq data logs - or an
085         * individual data log file
086         *
087         * @param file the directory - or file
088         * @param selector the JMS selector or null to select all
089         * @throws IOException
090         * @throws InvalidSelectorException
091         */
092        public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
093            String str = selector != null ? selector.trim() : null;
094            if (str != null && str.length() > 0) {
095                this.expression=SelectorParser.parse(str);
096            }
097            dataManager = new AsyncDataManager();
098            dataManager.setArchiveDataLogs(false);
099            if (file.isDirectory()) {
100                dataManager.setDirectory(file);
101            } else {
102                dataManager.setDirectory(file.getParentFile());
103                dataManager.setDirectoryArchive(file);
104                this.file = file;
105            }
106            dataManager.start();
107        }
108    
109        public Iterator<Message> iterator() {
110            return new AMQIterator(this,this.expression);
111        }
112    
113    
114        protected MessageLocation getNextMessage(MessageLocation lastLocation)
115                throws IllegalStateException, IOException {
116            if (this.file != null) {
117                return getInternalNextMessage(this.file, lastLocation);
118            }
119            return getInternalNextMessage(lastLocation);
120        }
121    
122        private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
123                throws IllegalStateException, IOException {
124            return getInternalNextMessage(null, lastLocation);
125        }
126    
127        private MessageLocation getInternalNextMessage(File file,
128                MessageLocation lastLocation) throws IllegalStateException,
129                IOException {
130            MessageLocation result = lastLocation;
131            if (result != null) {
132                result.setMessage(null);
133            }
134            Message message = null;
135            Location pos = lastLocation != null ? lastLocation.getLocation() : null;
136            while ((pos = getNextLocation(file, pos)) != null) {
137                message = getMessage(pos);
138                if (message != null) {
139                    if (result == null) {
140                        result = new MessageLocation();
141                    }
142                    result.setMessage(message);
143                    break;
144                }
145            }
146            result.setLocation(pos);
147            if (pos == null && message == null) {
148                result = null;
149            } else {
150                result.setLocation(pos);
151            }
152            return result;
153        }
154    
155        private Location getNextLocation(File file, Location last)
156                throws IllegalStateException, IOException {
157            if (file != null) {
158                return dataManager.getNextLocation(file, last, true);
159            }
160            return dataManager.getNextLocation(last);
161        }
162    
163        private Message getMessage(Location location) throws IOException {
164            ByteSequence data = dataManager.read(location);
165            DataStructure c = (DataStructure) wireFormat.unmarshal(data);
166            if (c instanceof Message) {
167                return (Message) c;
168            }
169            return null;
170    
171        }
172    }