001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.amq.reader;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.HashSet;
022 import java.util.Iterator;
023 import java.util.Set;
024 import javax.jms.InvalidSelectorException;
025 import javax.jms.Message;
026
027 import org.apache.activemq.command.DataStructure;
028 import org.apache.activemq.filter.BooleanExpression;
029 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
030 import org.apache.activemq.kaha.impl.async.Location;
031 import org.apache.activemq.openwire.OpenWireFormat;
032 import org.apache.activemq.selector.SelectorParser;
033 import org.apache.activemq.util.ByteSequence;
034 import org.apache.activemq.wireformat.WireFormat;
035
036 /**
037 * Reads and iterates through data log files for the AMQMessage Store
038 *
039 */
040 public class AMQReader implements Iterable<Message> {
041
042 private AsyncDataManager dataManager;
043 private WireFormat wireFormat = new OpenWireFormat();
044 private File file;
045 private BooleanExpression expression;
046
047 /**
048 * List all the data files in a directory
049 * @param directory
050 * @return
051 * @throws IOException
052 */
053 public static Set<File> listDataFiles(File directory) throws IOException{
054 Set<File>result = new HashSet<File>();
055 if (directory == null || !directory.exists() || !directory.isDirectory()) {
056 throw new IOException("Invalid Directory " + directory);
057 }
058 AsyncDataManager dataManager = new AsyncDataManager();
059 dataManager.setDirectory(directory);
060 dataManager.start();
061 Set<File> set = dataManager.getFiles();
062 if (set != null) {
063 result.addAll(set);
064 }
065 dataManager.close();
066 return result;
067 }
068 /**
069 * Create the AMQReader to read a directory of amq data logs - or an
070 * individual data log file
071 *
072 * @param file the directory - or file
073 * @throws IOException
074 * @throws InvalidSelectorException
075 * @throws IOException
076 * @throws InvalidSelectorException
077 */
078 public AMQReader(File file) throws InvalidSelectorException, IOException {
079 this(file,null);
080 }
081
082 /**
083 * Create the AMQReader to read a directory of amq data logs - or an
084 * individual data log file
085 *
086 * @param file the directory - or file
087 * @param selector the JMS selector or null to select all
088 * @throws IOException
089 * @throws InvalidSelectorException
090 */
091 public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
092 String str = selector != null ? selector.trim() : null;
093 if (str != null && str.length() > 0) {
094 this.expression=SelectorParser.parse(str);
095 }
096 dataManager = new AsyncDataManager();
097 dataManager.setArchiveDataLogs(false);
098 if (file.isDirectory()) {
099 dataManager.setDirectory(file);
100 } else {
101 dataManager.setDirectory(file.getParentFile());
102 dataManager.setDirectoryArchive(file);
103 this.file = file;
104 }
105 dataManager.start();
106 }
107
108 public Iterator<Message> iterator() {
109 return new AMQIterator(this,this.expression);
110 }
111
112
113 protected MessageLocation getNextMessage(MessageLocation lastLocation)
114 throws IllegalStateException, IOException {
115 if (this.file != null) {
116 return getInternalNextMessage(this.file, lastLocation);
117 }
118 return getInternalNextMessage(lastLocation);
119 }
120
121 private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
122 throws IllegalStateException, IOException {
123 return getInternalNextMessage(null, lastLocation);
124 }
125
126 private MessageLocation getInternalNextMessage(File file,
127 MessageLocation lastLocation) throws IllegalStateException,
128 IOException {
129 MessageLocation result = lastLocation;
130 if (result != null) {
131 result.setMessage(null);
132 }
133 Message message = null;
134 Location pos = lastLocation != null ? lastLocation.getLocation() : null;
135 while ((pos = getNextLocation(file, pos)) != null) {
136 message = getMessage(pos);
137 if (message != null) {
138 if (result == null) {
139 result = new MessageLocation();
140 }
141 result.setMessage(message);
142 break;
143 }
144 }
145 result.setLocation(pos);
146 if (pos == null && message == null) {
147 result = null;
148 } else {
149 result.setLocation(pos);
150 }
151 return result;
152 }
153
154 private Location getNextLocation(File file, Location last)
155 throws IllegalStateException, IOException {
156 if (file != null) {
157 return dataManager.getNextLocation(file, last, true);
158 }
159 return dataManager.getNextLocation(last);
160 }
161
162 private Message getMessage(Location location) throws IOException {
163 ByteSequence data = dataManager.read(location);
164 DataStructure c = (DataStructure) wireFormat.unmarshal(data);
165 if (c instanceof Message) {
166 return (Message) c;
167 }
168 return null;
169
170 }
171 }