001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.command;
018    
019    import java.net.URI;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.StringTokenizer;
024    
025    import javax.jms.Destination;
026    import javax.jms.Message;
027    import javax.management.MBeanServerConnection;
028    import javax.management.MBeanServerInvocationHandler;
029    import javax.management.ObjectInstance;
030    import javax.management.ObjectName;
031    import javax.management.openmbean.CompositeData;
032    import javax.management.remote.JMXConnector;
033    
034    import org.apache.activemq.broker.jmx.QueueViewMBean;
035    import org.apache.activemq.command.ActiveMQQueue;
036    import org.apache.activemq.console.util.AmqMessagesUtil;
037    import org.apache.activemq.console.util.JmxMBeansUtil;
038    
039    public class PurgeCommand extends AbstractJmxCommand {
040    
041        protected String[] helpFile = new String[] {
042            "Task Usage: Main purge [browse-options] <destinations>",
043            "Description: Delete selected destination's messages that matches the message selector.", 
044            "", 
045            "Purge Options:",
046            "    --msgsel <msgsel1,msglsel2>   Add to the search list messages matched by the query similar to",
047            "                                  the messages selector format.",
048            "    --jmxurl <url>                Set the JMX URL to connect to.",
049            "    --pid <pid>                   Set the pid to connect to (only on Sun JVM).",            
050            "    --jmxuser <user>              Set the JMX user used for authenticating.",
051            "    --jmxpassword <password>      Set the JMX password used for authenticating.",
052            "    --jmxlocal                    Use the local JMX server instead of a remote one.",
053            "    --version                     Display the version information.",
054            "    -h,-?,--help                  Display the browse broker help information.", 
055            "", 
056            "Examples:",
057            "    Main purge FOO.BAR", 
058            "        - Delete all the messages in queue FOO.BAR",
059    
060            "    Main purge --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.*", 
061            "        - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in",
062            "          the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the",
063            "          queue FOO.BAR.",
064            "          SLQ92 syntax is also supported.",
065            "        * To use wildcard queries, the field must be a string and the query enclosed in ''",
066            "          Use double quotes \"\" around the entire message selector string.",
067            ""
068        };
069    
070        private final List<String> queryAddObjects = new ArrayList<String>(10);
071        private final List<String> querySubObjects = new ArrayList<String>(10);
072    
073        @Override
074        public String getName() {
075            return "purge";
076        }
077    
078        @Override
079        public String getOneLineDescription() {
080            return "Delete selected destination's messages that matches the message selector";
081        }
082    
083        /**
084         * Execute the purge command, which allows you to purge the messages in a
085         * given JMS destination
086         * 
087         * @param tokens - command arguments
088         * @throws Exception
089         */
090        protected void runTask(List<String> tokens) throws Exception {
091            try {
092                // If there is no queue name specified, let's select all
093                if (tokens.isEmpty()) {
094                    tokens.add("*");
095                }
096    
097                // Iterate through the queue names
098                for (Iterator<String> i = tokens.iterator(); i.hasNext();) {
099                    List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "destinationType=Queue,destinationName=" + i.next() + ",*");
100    
101                    for (Iterator j = queueList.iterator(); j.hasNext();) {
102                        ObjectName queueName = ((ObjectInstance)j.next()).getObjectName();
103                        if (queryAddObjects.isEmpty()) {
104                            purgeQueue(queueName);
105                        } else {
106                            
107                            QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.
108                                            newProxyInstance(createJmxConnection(), 
109                                                            queueName, 
110                                                            QueueViewMBean.class, 
111                                                            true);
112                            int removed = 0;
113                            
114                            // AMQ-3404: We support two syntaxes for the message 
115                            // selector query:
116                            // 1) AMQ specific: 
117                            //    "JMSPriority>2,MyHeader='Foo'"
118                            //
119                            // 2) SQL-92 syntax:
120                            //    "(JMSPriority>2) AND (MyHeader='Foo')"
121                            //
122                            // If syntax style 1) is used, the comma separated
123                            // criterias are broken into List<String> elements. 
124                            // We then need to construct the SQL-92 query out of 
125                            // this list.
126                            
127                            String sqlQuery = null;
128                            if (queryAddObjects.size() > 1) {
129                                     sqlQuery = convertToSQL92(queryAddObjects);
130                            } else {
131                                    sqlQuery = queryAddObjects.get(0);
132                            }
133                            removed = proxy.removeMatchingMessages(sqlQuery);
134                            context.printInfo("Removed: " + removed
135                                    + " messages for message selector " + sqlQuery.toString());
136                        }
137                    }
138                }
139            } catch (Exception e) {
140                context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e));
141                throw new Exception(e);
142            }
143        }
144        
145        
146        /**
147         * Purge all the messages in the queue
148         * 
149         * @param queue - ObjectName of the queue to purge
150         * @throws Exception
151         */
152        public void purgeQueue(ObjectName queue) throws Exception {
153            context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("destinationName"));
154            createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {});
155        }
156    
157        /**
158         * Handle the --msgsel, --xmsgsel.
159         * 
160         * @param token - option token to handle
161         * @param tokens - succeeding command arguments
162         * @throws Exception
163         */
164        protected void handleOption(String token, List<String> tokens) throws Exception {
165            // If token is an additive message selector option
166            if (token.startsWith("--msgsel")) {
167    
168                // If no message selector is specified, or next token is a new
169                // option
170                if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
171                    context.printException(new IllegalArgumentException("Message selector not specified"));
172                    return;
173                }
174    
175                StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
176                while (queryTokens.hasMoreTokens()) {
177                    queryAddObjects.add(queryTokens.nextToken());
178                }
179            } else if (token.startsWith("--xmsgsel")) {
180                // If token is a substractive message selector option
181    
182                // If no message selector is specified, or next token is a new
183                // option
184                if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
185                    context.printException(new IllegalArgumentException("Message selector not specified"));
186                    return;
187                }
188    
189                StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
190                while (queryTokens.hasMoreTokens()) {
191                    querySubObjects.add(queryTokens.nextToken());
192                }
193    
194            } else {
195                // Let super class handle unknown option
196                super.handleOption(token, tokens);
197            }
198        }
199        
200        /**
201         * Converts the message selector as provided on command line
202         * argument to activem-admin into an SQL-92 conform string. 
203         * E.g.
204         *   "JMSMessageID='*:10',JMSPriority>5"
205         * gets converted into 
206         *   "(JMSMessageID='%:10') AND (JMSPriority>5)"
207         * 
208         * @param tokens - List of message selector query parameters 
209         * @return SQL-92 string of that query. 
210         */
211        public String convertToSQL92(List<String> tokens) {
212            String selector = "";
213    
214            // Convert to message selector
215            for (Iterator i = tokens.iterator(); i.hasNext(); ) {
216                selector = selector + "(" + i.next().toString() + ") AND ";
217            }
218    
219            // Remove last AND and replace '*' with '%'
220            if (!selector.equals("")) {
221                selector = selector.substring(0, selector.length() - 5);
222                selector = selector.replace('*', '%');
223            }
224            return selector;
225        }
226        
227    
228        /**
229         * Print the help messages for the browse command
230         */
231        protected void printHelp() {
232            context.printHelp(helpFile);
233        }
234    
235    }