001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.filter;
018    
019    import java.net.URI;
020    import java.util.Collections;
021    import java.util.List;
022    
023    import javax.jms.Connection;
024    import javax.jms.ConnectionFactory;
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.QueueBrowser;
028    import javax.jms.Session;
029    
030    import org.apache.activemq.ActiveMQConnectionFactory;
031    import org.apache.activemq.command.ActiveMQQueue;
032    import org.apache.activemq.command.ActiveMQTopic;
033    
034    public class AmqMessagesQueryFilter extends AbstractQueryFilter {
035    
036        private URI brokerUrl;
037        private Destination destination;
038    
039        private ConnectionFactory connectionFactory;
040    
041        /**
042         * Create a JMS message query filter
043         *
044         * @param brokerUrl   - broker url to connect to
045         * @param destination - JMS destination to query
046         */
047        public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
048            super(null);
049            this.brokerUrl = brokerUrl;
050            this.destination = destination;
051        }
052    
053        /**
054         * Create a JMS message query filter
055         *
056         * @param brokerUrl   - broker url to connect to
057         * @param destination - JMS destination to query
058         */
059        public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) {
060            super(null);
061            this.destination = destination;
062            this.connectionFactory = connectionFactory;
063        }
064    
065        /**
066         * Queries the specified destination using the message selector format query
067         *
068         * @param queries - message selector queries
069         * @return list messages that matches the selector
070         * @throws Exception
071         */
072        public List query(List queries) throws Exception {
073            String selector = "";
074    
075            // Convert to message selector
076            for (Object query : queries) {
077                selector = selector + "(" + query.toString() + ") AND ";
078            }
079    
080            // Remove last AND
081            if (!selector.equals("")) {
082                selector = selector.substring(0, selector.length() - 5);
083            }
084    
085            if (destination instanceof ActiveMQQueue) {
086                return queryMessages((ActiveMQQueue) destination, selector);
087            } else {
088                return queryMessages((ActiveMQTopic) destination, selector);
089            }
090        }
091    
092        /**
093         * Query the messages of a queue destination using a queue browser
094         *
095         * @param queue    - queue destination
096         * @param selector - message selector
097         * @return list of messages that matches the selector
098         * @throws Exception
099         */
100        protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
101            Connection conn = createConnection();
102    
103            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
104            QueueBrowser browser = sess.createBrowser(queue, selector);
105    
106            List messages = Collections.list(browser.getEnumeration());
107    
108            conn.close();
109    
110            return messages;
111        }
112    
113        /**
114         * Query the messages of a topic destination using a message consumer
115         *
116         * @param topic    - topic destination
117         * @param selector - message selector
118         * @return list of messages that matches the selector
119         * @throws Exception
120         */
121        protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
122            // TODO: should we use a durable subscriber or a retroactive non-durable
123            // subscriber?
124            // TODO: if a durable subscriber is used, how do we manage it?
125            // subscribe/unsubscribe tasks?
126            return null;
127        }
128    
129        /**
130         * Create and start a JMS connection
131         *
132         * @return JMS connection
133         * @throws JMSException
134         */
135        protected Connection createConnection() throws JMSException {
136            // maintain old behaviour, when called either way.
137            if (null == connectionFactory) {
138                connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl()));
139            }
140            Connection conn = connectionFactory.createConnection();
141            conn.start();
142            return conn;
143        }
144    
145        /**
146         * Get the broker url being used.
147         *
148         * @return broker url
149         */
150        public URI getBrokerUrl() {
151            return brokerUrl;
152        }
153    
154        /**
155         * Set the broker url to use.
156         *
157         * @param brokerUrl - broker url
158         */
159        public void setBrokerUrl(URI brokerUrl) {
160            this.brokerUrl = brokerUrl;
161        }
162    
163        /**
164         * Get the destination being used.
165         *
166         * @return - JMS destination
167         */
168        public Destination getDestination() {
169            return destination;
170        }
171    
172        /**
173         * Set the destination to use.
174         *
175         * @param destination - JMS destination
176         */
177        public void setDestination(Destination destination) {
178            this.destination = destination;
179        }
180    }