001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.filter;
018
019import java.net.URI;
020import java.util.Collections;
021import java.util.List;
022
023import javax.jms.Connection;
024import javax.jms.ConnectionFactory;
025import javax.jms.Destination;
026import javax.jms.JMSException;
027import javax.jms.QueueBrowser;
028import javax.jms.Session;
029
030import org.apache.activemq.ActiveMQConnectionFactory;
031import org.apache.activemq.command.ActiveMQQueue;
032import org.apache.activemq.command.ActiveMQTopic;
033
034public class AmqMessagesQueryFilter extends AbstractQueryFilter {
035
036    private URI brokerUrl;
037    private Destination destination;
038
039    private ConnectionFactory connectionFactory;
040
041    /**
042     * Create a JMS message query filter
043     *
044     * @param brokerUrl   - broker url to connect to
045     * @param destination - JMS destination to query
046     */
047    public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
048        super(null);
049        this.brokerUrl = brokerUrl;
050        this.destination = destination;
051    }
052
053    /**
054     * Create a JMS message query filter
055     *
056     * @param connectionFactory - to connect with
057     * @param destination - JMS destination to query
058     */
059    public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) {
060        super(null);
061        this.destination = destination;
062        this.connectionFactory = connectionFactory;
063    }
064
065    /**
066     * Queries the specified destination using the message selector format query
067     *
068     * @param queries - message selector queries
069     * @return list messages that matches the selector
070     * @throws Exception
071     */
072    public List query(List queries) throws Exception {
073        String selector = "";
074
075        // Convert to message selector
076        for (Object query : queries) {
077            selector = selector + "(" + query.toString() + ") AND ";
078        }
079
080        // Remove last AND
081        if (!selector.equals("")) {
082            selector = selector.substring(0, selector.length() - 5);
083        }
084
085        if (destination instanceof ActiveMQQueue) {
086            return queryMessages((ActiveMQQueue) destination, selector);
087        } else {
088            return queryMessages((ActiveMQTopic) destination, selector);
089        }
090    }
091
092    /**
093     * Query the messages of a queue destination using a queue browser
094     *
095     * @param queue    - queue destination
096     * @param selector - message selector
097     * @return list of messages that matches the selector
098     * @throws Exception
099     */
100    protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
101        Connection conn = createConnection();
102
103        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
104        QueueBrowser browser = sess.createBrowser(queue, selector);
105
106        List messages = Collections.list(browser.getEnumeration());
107
108        conn.close();
109
110        return messages;
111    }
112
113    /**
114     * Query the messages of a topic destination using a message consumer
115     *
116     * @param topic    - topic destination
117     * @param selector - message selector
118     * @return list of messages that matches the selector
119     * @throws Exception
120     */
121    protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
122        // TODO: should we use a durable subscriber or a retroactive non-durable
123        // subscriber?
124        // TODO: if a durable subscriber is used, how do we manage it?
125        // subscribe/unsubscribe tasks?
126        return null;
127    }
128
129    /**
130     * Create and start a JMS connection
131     *
132     * @return JMS connection
133     * @throws JMSException
134     */
135    protected Connection createConnection() throws JMSException {
136        // maintain old behaviour, when called either way.
137        if (null == connectionFactory) {
138            connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl()));
139        }
140        Connection conn = connectionFactory.createConnection();
141        conn.start();
142        return conn;
143    }
144
145    /**
146     * Get the broker url being used.
147     *
148     * @return broker url
149     */
150    public URI getBrokerUrl() {
151        return brokerUrl;
152    }
153
154    /**
155     * Set the broker url to use.
156     *
157     * @param brokerUrl - broker url
158     */
159    public void setBrokerUrl(URI brokerUrl) {
160        this.brokerUrl = brokerUrl;
161    }
162
163    /**
164     * Get the destination being used.
165     *
166     * @return - JMS destination
167     */
168    public Destination getDestination() {
169        return destination;
170    }
171
172    /**
173     * Set the destination to use.
174     *
175     * @param destination - JMS destination
176     */
177    public void setDestination(Destination destination) {
178        this.destination = destination;
179    }
180}