001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.PrintWriter;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Scanner;
030    
031    import org.apache.activemq.command.ActiveMQBlobMessage;
032    import org.apache.activemq.command.ActiveMQBytesMessage;
033    import org.apache.activemq.command.ActiveMQMapMessage;
034    import org.apache.activemq.command.ActiveMQMessage;
035    import org.apache.activemq.command.ActiveMQObjectMessage;
036    import org.apache.activemq.command.ActiveMQStreamMessage;
037    import org.apache.activemq.command.ActiveMQTextMessage;
038    import org.apache.activemq.command.DataStructure;
039    import org.apache.activemq.command.JournalQueueAck;
040    import org.apache.activemq.command.JournalTopicAck;
041    import org.apache.activemq.command.JournalTrace;
042    import org.apache.activemq.command.JournalTransaction;
043    import org.apache.activemq.kaha.impl.async.Location;
044    import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
045    import org.apache.activemq.openwire.OpenWireFormat;
046    import org.apache.activemq.util.ByteSequence;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.apache.velocity.Template;
049    import org.apache.velocity.VelocityContext;
050    import org.apache.velocity.app.Velocity;
051    import org.apache.velocity.app.VelocityEngine;
052    import org.josql.Query;
053    
054    /**
055     * Allows you to view the contents of a Journal.
056     */
057    public class AMQJournalTool {
058    
059            private final ArrayList<File> dirs = new ArrayList<File>();
060            private final WireFormat wireFormat = new OpenWireFormat();
061            private final HashMap<String, String> resources = new HashMap<String, String>();
062    
063            private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
064            private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
065            private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
066            private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
067            private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
068            private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
069            private String where;
070            private VelocityContext context;
071            private VelocityEngine velocity;
072            private boolean help;
073    
074            public static void main(String[] args) throws Exception {
075                    AMQJournalTool consumerTool = new AMQJournalTool();
076                    String[] directories = CommandLineSupport
077                                    .setOptions(consumerTool, args);
078                    if (directories.length < 1) {
079                            System.out
080                                            .println("Please specify the directories with journal data to scan");
081                            return;
082                    }
083                    for (int i = 0; i < directories.length; i++) {
084                            consumerTool.getDirs().add(new File(directories[i]));
085                    }
086                    consumerTool.execute();
087            }
088    
089            public void execute() throws Exception {
090    
091                    if( help ) {
092                            showHelp();
093                            return;
094                    }
095                    
096                    if (getDirs().size() < 1) {
097                            System.out.println("");
098                            System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
099                            System.out.println("");
100                            showHelp();
101                            return;
102                    }
103    
104                    for (File dir : getDirs()) {
105                            if( !dir.exists() ) {
106                                    System.out.println("");
107                                    System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
108                                    System.out.println("");
109                                    showHelp();
110                                    return;
111                            }
112                            if( !dir.isDirectory() ) {
113                                    System.out.println("");
114                                    System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
115                                    System.out.println("");
116                                    showHelp();
117                                    return;
118                            }
119                    }
120                    
121                    
122                    context = new VelocityContext();
123                    List keys = Arrays.asList(context.getKeys());
124    
125                    for (Iterator iterator = System.getProperties().entrySet()
126                                    .iterator(); iterator.hasNext();) {
127                            Map.Entry kv = (Map.Entry) iterator.next();
128                            String name = (String) kv.getKey();
129                            String value = (String) kv.getValue();
130    
131                            if (!keys.contains(name)) {
132                                    context.put(name, value);
133                            }
134                    }
135                    
136                    velocity = new VelocityEngine();
137                    velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
138                    velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
139                    velocity.init();
140    
141    
142                    resources.put("message", messageFormat);
143                    resources.put("topicAck", topicAckFormat);
144                    resources.put("queueAck", queueAckFormat);
145                    resources.put("transaction", transactionFormat);
146                    resources.put("trace", traceFormat);
147                    resources.put("unknown", unknownFormat);
148    
149                    Query query = null;
150                    if (where != null) {
151                            query = new Query();
152                            query.parse("select * from "+Entry.class.getName()+" where "+where);
153    
154                    }
155    
156                    ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
157                    manager.start();
158                    try {
159                            Location curr = manager.getFirstLocation();
160                            while (curr != null) {
161    
162                                    ByteSequence data = manager.read(curr);
163                                    DataStructure c = (DataStructure) wireFormat.unmarshal(data);
164    
165                                    Entry entry = new Entry();
166                                    entry.setLocation(curr);
167                                    entry.setRecord(c);
168                                    entry.setData(data);
169                                    entry.setQuery(query);
170                                    process(entry);
171    
172                                    curr = manager.getNextLocation(curr);
173                            }
174                    } finally {
175                            manager.close();
176                    }
177            }
178    
179            private void showHelp() {
180                    InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
181                    Scanner scanner = new Scanner(is);
182                    while (scanner.hasNextLine()) {
183                            String line = scanner.nextLine();
184                            System.out.println(line);
185                    }
186                    scanner.close();        }
187    
188            private void process(Entry entry) throws Exception {
189    
190                    Location location = entry.getLocation();
191                    DataStructure record = entry.getRecord();
192    
193                    switch (record.getDataStructureType()) {
194                    case ActiveMQMessage.DATA_STRUCTURE_TYPE:
195                            entry.setType("ActiveMQMessage");
196                            entry.setFormater("message");
197                            display(entry);
198                            break;
199                    case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
200                            entry.setType("ActiveMQBytesMessage");
201                            entry.setFormater("message");
202                            display(entry);
203                            break;
204                    case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
205                            entry.setType("ActiveMQBlobMessage");
206                            entry.setFormater("message");
207                            display(entry);
208                            break;
209                    case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
210                            entry.setType("ActiveMQMapMessage");
211                            entry.setFormater("message");
212                            display(entry);
213                            break;
214                    case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
215                            entry.setType("ActiveMQObjectMessage");
216                            entry.setFormater("message");
217                            display(entry);
218                            break;
219                    case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
220                            entry.setType("ActiveMQStreamMessage");
221                            entry.setFormater("message");
222                            display(entry);
223                            break;
224                    case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
225                            entry.setType("ActiveMQTextMessage");
226                            entry.setFormater("message");
227                            display(entry);
228                            break;
229                    case JournalQueueAck.DATA_STRUCTURE_TYPE:
230                            entry.setType("Queue Ack");
231                            entry.setFormater("queueAck");
232                            display(entry);
233                            break;
234                    case JournalTopicAck.DATA_STRUCTURE_TYPE:
235                            entry.setType("Topic Ack");
236                            entry.setFormater("topicAck");
237                            display(entry);
238                            break;
239                    case JournalTransaction.DATA_STRUCTURE_TYPE:
240                            entry.setType(getType((JournalTransaction) record));
241                            entry.setFormater("transaction");
242                            display(entry);
243                            break;
244                    case JournalTrace.DATA_STRUCTURE_TYPE:
245                            entry.setType("Trace");
246                            entry.setFormater("trace");
247                            display(entry);
248                            break;
249                    default:
250                            entry.setType("Unknown");
251                            entry.setFormater("unknown");
252                            display(entry);
253                            break;
254                    }
255            }
256    
257            private String getType(JournalTransaction record) {
258                    switch (record.getType()) {
259                    case JournalTransaction.XA_PREPARE:
260                            return "XA Prepare";
261                    case JournalTransaction.XA_COMMIT:
262                            return "XA Commit";
263                    case JournalTransaction.XA_ROLLBACK:
264                            return "XA Rollback";
265                    case JournalTransaction.LOCAL_COMMIT:
266                            return "Commit";
267                    case JournalTransaction.LOCAL_ROLLBACK:
268                            return "Rollback";
269                    }
270                    return "Unknown Transaction";
271            }
272    
273            private void display(Entry entry) throws Exception {
274    
275                    if (entry.getQuery() != null) {
276                            List list = Collections.singletonList(entry);
277                            List results = entry.getQuery().execute(list).getResults();
278                            if (results.isEmpty()) {
279                                    return;
280                            }
281                    }
282    
283                    CustomResourceLoader.setResources(resources);
284                    try {
285    
286                            context.put("location", entry.getLocation());
287                            context.put("record", entry.getRecord());
288                            context.put("type", entry.getType());
289                            if (entry.getRecord() instanceof ActiveMQMessage) {
290                                    context.put("body", new MessageBodyFormatter(
291                                                    (ActiveMQMessage) entry.getRecord()));
292                            }
293    
294                            Template template = velocity.getTemplate(entry.getFormater());
295                            PrintWriter writer = new PrintWriter(System.out);
296                            template.merge(context, writer);
297                            writer.println();
298                            writer.flush();
299                    } finally {
300                            CustomResourceLoader.setResources(null);
301                    }
302            }
303    
304            public void setMessageFormat(String messageFormat) {
305                    this.messageFormat = messageFormat;
306            }
307    
308            public void setTopicAckFormat(String ackFormat) {
309                    this.topicAckFormat = ackFormat;
310            }
311    
312            public void setTransactionFormat(String transactionFormat) {
313                    this.transactionFormat = transactionFormat;
314            }
315    
316            public void setTraceFormat(String traceFormat) {
317                    this.traceFormat = traceFormat;
318            }
319    
320            public void setUnknownFormat(String unknownFormat) {
321                    this.unknownFormat = unknownFormat;
322            }
323    
324            public void setQueueAckFormat(String queueAckFormat) {
325                    this.queueAckFormat = queueAckFormat;
326            }
327    
328            public String getQuery() {
329                    return where;
330            }
331    
332            public void setWhere(String query) {
333                    this.where = query;
334            }
335    
336            public boolean isHelp() {
337                    return help;
338            }
339    
340            public void setHelp(boolean help) {
341                    this.help = help;
342            }
343    
344            /**
345             * @return the dirs
346             */
347            public ArrayList<File> getDirs() {
348                    return dirs;
349            }
350    
351    }