001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.Enumeration;
020import java.util.concurrent.atomic.AtomicBoolean;
021
022import javax.jms.*;
023import javax.jms.IllegalStateException;
024
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ConsumerId;
027import org.apache.activemq.command.MessageDispatch;
028import org.apache.activemq.selector.SelectorParser;
029
030/**
031 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
032 * queue without removing them. <p/>
033 * <P>
034 * The <CODE>getEnumeration</CODE> method returns a <CODE>
035 * java.util.Enumeration</CODE>
036 * that is used to scan the queue's messages. It may be an enumeration of the
037 * entire content of a queue, or it may contain only the messages matching a
038 * message selector. <p/>
039 * <P>
040 * Messages may be arriving and expiring while the scan is done. The JMS API
041 * does not require the content of an enumeration to be a static snapshot of
042 * queue content. Whether these changes are visible or not depends on the JMS
043 * provider. <p/>
044 * <P>
045 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
046 * </CODE>
047 * or a <CODE>QueueSession</CODE>.
048 *
049 * @see javax.jms.Session#createBrowser
050 * @see javax.jms.QueueSession#createBrowser
051 * @see javax.jms.QueueBrowser
052 * @see javax.jms.QueueReceiver
053 */
054
055public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
056
057    private final ActiveMQSession session;
058    private final ActiveMQDestination destination;
059    private final String selector;
060
061    private ActiveMQMessageConsumer consumer;
062    private boolean closed;
063    private final ConsumerId consumerId;
064    private final AtomicBoolean browseDone = new AtomicBoolean(true);
065    private final boolean dispatchAsync;
066    private Object semaphore = new Object();
067
068    /**
069     * Constructor for an ActiveMQQueueBrowser - used internally
070     * @throws JMSException
071     */
072    protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
073        if (destination == null) {
074            throw new InvalidDestinationException("Don't understand null destinations");
075        } else if (destination.getPhysicalName() == null) {
076            throw new InvalidDestinationException("The destination object was not given a physical name.");
077        }
078        if (selector != null && selector.trim().length() != 0) {
079            // Validate the selector
080            SelectorParser.parse(selector);
081        }
082
083        this.session = session;
084        this.consumerId = consumerId;
085        this.destination = destination;
086        this.selector = selector;
087        this.dispatchAsync = dispatchAsync;
088    }
089
090    /**
091     * @throws JMSException
092     */
093    private ActiveMQMessageConsumer createConsumer() throws JMSException {
094        browseDone.set(false);
095        ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
096
097        return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
098            .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
099            public void dispatch(MessageDispatch md) {
100                if (md.getMessage() == null) {
101                    browseDone.set(true);
102                } else {
103                    super.dispatch(md);
104                }
105                notifyMessageAvailable();
106            }
107        };
108    }
109
110    private void destroyConsumer() {
111        if (consumer == null) {
112            return;
113        }
114        try {
115            if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) {
116                session.commit();
117            }
118            consumer.close();
119            consumer = null;
120        } catch (JMSException e) {
121            e.printStackTrace();
122        }
123    }
124
125    /**
126     * Gets an enumeration for browsing the current queue messages in the order
127     * they would be received.
128     *
129     * @return an enumeration for browsing the messages
130     * @throws JMSException if the JMS provider fails to get the enumeration for
131     *                 this browser due to some internal error.
132     */
133
134    public Enumeration getEnumeration() throws JMSException {
135        checkClosed();
136        if (consumer == null) {
137            consumer = createConsumer();
138        }
139        return this;
140    }
141
142    private void checkClosed() throws IllegalStateException {
143        if (closed) {
144            throw new IllegalStateException("The Consumer is closed");
145        }
146    }
147
148    /**
149     * @return true if more messages to process
150     */
151    public boolean hasMoreElements() {
152        while (true) {
153
154            synchronized (this) {
155                if (consumer == null) {
156                    return false;
157                }
158            }
159
160            if (consumer.getMessageSize() > 0) {
161                return true;
162            }
163
164            if (browseDone.get() || !session.isRunning()) {
165                destroyConsumer();
166                return false;
167            }
168
169            waitForMessage();
170        }
171    }
172
173    /**
174     * @return the next message
175     */
176    public Object nextElement() {
177        while (true) {
178
179            synchronized (this) {
180                if (consumer == null) {
181                    return null;
182                }
183            }
184
185            try {
186                javax.jms.Message answer = consumer.receiveNoWait();
187                if (answer != null) {
188                    return answer;
189                }
190            } catch (JMSException e) {
191                this.session.connection.onClientInternalException(e);
192                return null;
193            }
194
195            if (browseDone.get() || !session.isRunning()) {
196                destroyConsumer();
197                return null;
198            }
199
200            waitForMessage();
201        }
202    }
203
204    public synchronized void close() throws JMSException {
205        browseDone.set(true);
206        destroyConsumer();
207        closed = true;
208    }
209
210    /**
211     * Gets the queue associated with this queue browser.
212     *
213     * @return the queue
214     * @throws JMSException if the JMS provider fails to get the queue
215     *                 associated with this browser due to some internal error.
216     */
217
218    public Queue getQueue() throws JMSException {
219        return (Queue)destination;
220    }
221
222    public String getMessageSelector() throws JMSException {
223        return selector;
224    }
225
226    // Implementation methods
227    // -------------------------------------------------------------------------
228
229    /**
230     * Wait on a semaphore for a fixed amount of time for a message to come in.
231     * @throws JMSException
232     */
233    protected void waitForMessage() {
234        try {
235            consumer.sendPullCommand(-1);
236            synchronized (semaphore) {
237                semaphore.wait(2000);
238            }
239        } catch (InterruptedException e) {
240            Thread.currentThread().interrupt();
241        } catch (JMSException e) {
242        }
243
244    }
245
246    protected void notifyMessageAvailable() {
247        synchronized (semaphore) {
248            semaphore.notifyAll();
249        }
250    }
251
252    public String toString() {
253        return "ActiveMQQueueBrowser { value=" + consumerId + " }";
254    }
255
256}