001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.command.store.amq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.PrintWriter;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Scanner;
030    
031    import org.apache.activemq.command.ActiveMQBlobMessage;
032    import org.apache.activemq.command.ActiveMQBytesMessage;
033    import org.apache.activemq.command.ActiveMQMapMessage;
034    import org.apache.activemq.command.ActiveMQMessage;
035    import org.apache.activemq.command.ActiveMQObjectMessage;
036    import org.apache.activemq.command.ActiveMQStreamMessage;
037    import org.apache.activemq.command.ActiveMQTextMessage;
038    import org.apache.activemq.command.DataStructure;
039    import org.apache.activemq.command.JournalQueueAck;
040    import org.apache.activemq.command.JournalTopicAck;
041    import org.apache.activemq.command.JournalTrace;
042    import org.apache.activemq.command.JournalTransaction;
043    import org.apache.activemq.kaha.impl.async.Location;
044    import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
045    import org.apache.activemq.openwire.OpenWireFormat;
046    import org.apache.activemq.util.ByteSequence;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.apache.velocity.Template;
049    import org.apache.velocity.VelocityContext;
050    import org.apache.velocity.app.Velocity;
051    import org.apache.velocity.app.VelocityEngine;
052    import org.josql.Query;
053    
054    /**
055     * Allows you to view the contents of a Journal.
056     * 
057     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
058     */
059    public class AMQJournalTool {
060    
061            private final ArrayList<File> dirs = new ArrayList<File>();
062            private final WireFormat wireFormat = new OpenWireFormat();
063            private final HashMap<String, String> resources = new HashMap<String, String>();
064    
065            private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
066            private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
067            private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
068            private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
069            private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
070            private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
071            private String where;
072            private VelocityContext context;
073            private VelocityEngine velocity;
074            private boolean help;
075    
076            public static void main(String[] args) throws Exception {
077                    AMQJournalTool consumerTool = new AMQJournalTool();
078                    String[] directories = CommandLineSupport
079                                    .setOptions(consumerTool, args);
080                    if (directories.length < 1) {
081                            System.out
082                                            .println("Please specify the directories with journal data to scan");
083                            return;
084                    }
085                    for (int i = 0; i < directories.length; i++) {
086                            consumerTool.getDirs().add(new File(directories[i]));
087                    }
088                    consumerTool.execute();
089            }
090    
091            /**
092             * Creates a new VelocityContext that is pre-populated with the JVMs
093             * system properties. 
094             * 
095             * @return - the VelocityContext that got created.
096             */
097            protected VelocityContext createVelocityContext() {
098                    VelocityContext ctx = new VelocityContext();
099                    List keys = Arrays.asList(ctx.getKeys());
100    
101                    for (Iterator iterator = System.getProperties().entrySet()
102                                    .iterator(); iterator.hasNext();) {
103                            Map.Entry kv = (Map.Entry) iterator.next();
104                            String name = (String) kv.getKey();
105                            String value = (String) kv.getValue();
106    
107                            if (!keys.contains(name)) {
108                                    ctx.put(name, value);
109                            }
110                    }
111                    return ctx;
112            }
113            
114            
115            public void execute() throws Exception {
116    
117                    if( help ) {
118                            showHelp();
119                            return;
120                    }
121                    
122                    if (getDirs().size() < 1) {
123                            System.out.println("");
124                            System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
125                            System.out.println("");
126                            showHelp();
127                            return;
128                    }
129    
130                    for (File dir : getDirs()) {
131                            if( !dir.exists() ) {
132                                    System.out.println("");
133                                    System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
134                                    System.out.println("");
135                                    showHelp();
136                                    return;
137                            }
138                            if( !dir.isDirectory() ) {
139                                    System.out.println("");
140                                    System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
141                                    System.out.println("");
142                                    showHelp();
143                                    return;
144                            }
145                    }
146                    
147                    context = createVelocityContext();
148                    
149                    velocity = new VelocityEngine();
150                    velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
151                    velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
152                    velocity.init();
153    
154                    resources.put("message", messageFormat);
155                    resources.put("topicAck", topicAckFormat);
156                    resources.put("queueAck", queueAckFormat);
157                    resources.put("transaction", transactionFormat);
158                    resources.put("trace", traceFormat);
159                    resources.put("unknown", unknownFormat);
160                    
161                    Query query = null;
162                    if (where != null) {
163                            query = new Query();
164                            query.parse("select * from "+Entry.class.getName()+" where "+where);
165    
166                    }
167    
168                    ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
169                    manager.start();
170                    try {
171                            Location curr = manager.getFirstLocation();
172                            while (curr != null) {
173    
174                                    ByteSequence data = manager.read(curr);
175                                    DataStructure c = (DataStructure) wireFormat.unmarshal(data);
176    
177                                    Entry entry = new Entry();
178                                    entry.setLocation(curr);
179                                    entry.setRecord(c);
180                                    entry.setData(data);
181                                    entry.setQuery(query);
182                                    process(entry);
183    
184                                    curr = manager.getNextLocation(curr);           
185                            }
186                    } finally {
187                            manager.close();
188                    }
189            }
190    
191            private void showHelp() {
192                    InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
193                    Scanner scanner = new Scanner(is);
194                    while (scanner.hasNextLine()) {
195                            String line = scanner.nextLine();
196                            System.out.println(line);
197                    }
198                    scanner.close();        }
199    
200            private void process(Entry entry) throws Exception {
201    
202                    DataStructure record = entry.getRecord();
203    
204                    switch (record.getDataStructureType()) {
205                    case ActiveMQMessage.DATA_STRUCTURE_TYPE:
206                            entry.setType("ActiveMQMessage");
207                            entry.setFormater("message");
208                            display(entry);
209                            break;
210                    case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
211                            entry.setType("ActiveMQBytesMessage");
212                            entry.setFormater("message");
213                            display(entry);
214                            break;
215                    case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
216                            entry.setType("ActiveMQBlobMessage");
217                            entry.setFormater("message");
218                            display(entry);
219                            break;
220                    case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
221                            entry.setType("ActiveMQMapMessage");
222                            entry.setFormater("message");
223                            display(entry);
224                            break;
225                    case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
226                            entry.setType("ActiveMQObjectMessage");
227                            entry.setFormater("message");
228                            display(entry);
229                            break;
230                    case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
231                            entry.setType("ActiveMQStreamMessage");
232                            entry.setFormater("message");
233                            display(entry);
234                            break;
235                    case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
236                            entry.setType("ActiveMQTextMessage");
237                            entry.setFormater("message");
238                            display(entry);
239                            break;
240                    case JournalQueueAck.DATA_STRUCTURE_TYPE:
241                            entry.setType("Queue Ack");
242                            entry.setFormater("queueAck");
243                            display(entry);
244                            break;
245                    case JournalTopicAck.DATA_STRUCTURE_TYPE:
246                            entry.setType("Topic Ack");
247                            entry.setFormater("topicAck");
248                            display(entry);
249                            break;
250                    case JournalTransaction.DATA_STRUCTURE_TYPE:
251                            entry.setType(getType((JournalTransaction) record));
252                            entry.setFormater("transaction");
253                            display(entry);
254                            break;
255                    case JournalTrace.DATA_STRUCTURE_TYPE:
256                            entry.setType("Trace");
257                            entry.setFormater("trace");
258                            display(entry);
259                            break;
260                    default:
261                            entry.setType("Unknown");
262                            entry.setFormater("unknown");
263                            display(entry);
264                            break;
265                    }
266            }
267    
268            private String getType(JournalTransaction record) {
269                    switch (record.getType()) {
270                    case JournalTransaction.XA_PREPARE:
271                            return "XA Prepare";
272                    case JournalTransaction.XA_COMMIT:
273                            return "XA Commit";
274                    case JournalTransaction.XA_ROLLBACK:
275                            return "XA Rollback";
276                    case JournalTransaction.LOCAL_COMMIT:
277                            return "Commit";
278                    case JournalTransaction.LOCAL_ROLLBACK:
279                            return "Rollback";
280                    }
281                    return "Unknown Transaction";
282            }
283    
284            private void display(Entry entry) throws Exception {
285    
286                    if (entry.getQuery() != null) {
287                            List list = Collections.singletonList(entry);
288                            List results = entry.getQuery().execute(list).getResults();
289                            if (results.isEmpty()) {
290                                    return;
291                            }
292                    }
293    
294                    CustomResourceLoader.setResources(resources);
295                    try {
296    
297                            context.put("location", entry.getLocation());
298                            context.put("record", entry.getRecord());
299                            context.put("type", entry.getType());
300                            if (entry.getRecord() instanceof ActiveMQMessage) {
301                                    context.put("body", new MessageBodyFormatter(
302                                                    (ActiveMQMessage) entry.getRecord()));
303                            }
304    
305                            Template template = velocity.getTemplate(entry.getFormater());
306                            PrintWriter writer = new PrintWriter(System.out);
307                            template.merge(context, writer);
308                            writer.println();
309                            writer.flush();
310                    } finally {
311                            CustomResourceLoader.setResources(null);
312                    }
313            }
314    
315            public void setMessageFormat(String messageFormat) {
316                    this.messageFormat = messageFormat;
317            }
318    
319            public void setTopicAckFormat(String ackFormat) {
320                    this.topicAckFormat = ackFormat;
321            }
322    
323            public void setTransactionFormat(String transactionFormat) {
324                    this.transactionFormat = transactionFormat;
325            }
326    
327            public void setTraceFormat(String traceFormat) {
328                    this.traceFormat = traceFormat;
329            }
330    
331            public void setUnknownFormat(String unknownFormat) {
332                    this.unknownFormat = unknownFormat;
333            }
334    
335            public void setQueueAckFormat(String queueAckFormat) {
336                    this.queueAckFormat = queueAckFormat;
337            }
338    
339            public String getQuery() {
340                    return where;
341            }
342    
343            public void setWhere(String query) {
344                    this.where = query;
345            }
346    
347            public boolean isHelp() {
348                    return help;
349            }
350    
351            public void setHelp(boolean help) {
352                    this.help = help;
353            }
354    
355            /**
356             * @return the dirs
357             */
358            public ArrayList<File> getDirs() {
359                    return dirs;
360            }
361    
362    }