001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.Enumeration;
020    import java.util.concurrent.atomic.AtomicBoolean;
021    
022    import javax.jms.*;
023    import javax.jms.IllegalStateException;
024    
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.ConsumerId;
027    import org.apache.activemq.command.MessageDispatch;
028    import org.apache.activemq.selector.SelectorParser;
029    
030    /**
031     * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
032     * queue without removing them. <p/>
033     * <P>
034     * The <CODE>getEnumeration</CODE> method returns a <CODE>
035     * java.util.Enumeration</CODE>
036     * that is used to scan the queue's messages. It may be an enumeration of the
037     * entire content of a queue, or it may contain only the messages matching a
038     * message selector. <p/>
039     * <P>
040     * Messages may be arriving and expiring while the scan is done. The JMS API
041     * does not require the content of an enumeration to be a static snapshot of
042     * queue content. Whether these changes are visible or not depends on the JMS
043     * provider. <p/>
044     * <P>
045     * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
046     * </CODE>
047     * or a <CODE>QueueSession</CODE>.
048     *
049     * @see javax.jms.Session#createBrowser
050     * @see javax.jms.QueueSession#createBrowser
051     * @see javax.jms.QueueBrowser
052     * @see javax.jms.QueueReceiver
053     */
054    
055    public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
056    
057        private final ActiveMQSession session;
058        private final ActiveMQDestination destination;
059        private final String selector;
060    
061        private ActiveMQMessageConsumer consumer;
062        private boolean closed;
063        private final ConsumerId consumerId;
064        private final AtomicBoolean browseDone = new AtomicBoolean(true);
065        private final boolean dispatchAsync;
066        private Object semaphore = new Object();
067    
068        /**
069         * Constructor for an ActiveMQQueueBrowser - used internally
070         * @throws JMSException
071         */
072        protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
073            if (destination == null) {
074                throw new InvalidDestinationException("Don't understand null destinations");
075            } else if (destination.getPhysicalName() == null) {
076                throw new InvalidDestinationException("The destination object was not given a physical name.");
077            }
078            if (selector != null && selector.trim().length() != 0) {
079                // Validate the selector
080                SelectorParser.parse(selector);
081            }
082    
083            this.session = session;
084            this.consumerId = consumerId;
085            this.destination = destination;
086            this.selector = selector;
087            this.dispatchAsync = dispatchAsync;
088        }
089    
090        /**
091         * @throws JMSException
092         */
093        private ActiveMQMessageConsumer createConsumer() throws JMSException {
094            browseDone.set(false);
095            ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
096    
097            return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
098                .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
099                public void dispatch(MessageDispatch md) {
100                    if (md.getMessage() == null) {
101                        browseDone.set(true);
102                    } else {
103                        super.dispatch(md);
104                    }
105                    notifyMessageAvailable();
106                }
107            };
108        }
109    
110        private void destroyConsumer() {
111            if (consumer == null) {
112                return;
113            }
114            try {
115                if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) {
116                    session.commit();
117                }
118                consumer.close();
119                consumer = null;
120            } catch (JMSException e) {
121                e.printStackTrace();
122            }
123        }
124    
125        /**
126         * Gets an enumeration for browsing the current queue messages in the order
127         * they would be received.
128         *
129         * @return an enumeration for browsing the messages
130         * @throws JMSException if the JMS provider fails to get the enumeration for
131         *                 this browser due to some internal error.
132         */
133    
134        public Enumeration getEnumeration() throws JMSException {
135            checkClosed();
136            if (consumer == null) {
137                consumer = createConsumer();
138            }
139            return this;
140        }
141    
142        private void checkClosed() throws IllegalStateException {
143            if (closed) {
144                throw new IllegalStateException("The Consumer is closed");
145            }
146        }
147    
148        /**
149         * @return true if more messages to process
150         */
151        public boolean hasMoreElements() {
152            while (true) {
153    
154                synchronized (this) {
155                    if (consumer == null) {
156                        return false;
157                    }
158                }
159    
160                if (consumer.getMessageSize() > 0) {
161                    return true;
162                }
163    
164                if (browseDone.get() || !session.isRunning()) {
165                    destroyConsumer();
166                    return false;
167                }
168    
169                waitForMessage();
170            }
171        }
172    
173        /**
174         * @return the next message
175         */
176        public Object nextElement() {
177            while (true) {
178    
179                synchronized (this) {
180                    if (consumer == null) {
181                        return null;
182                    }
183                }
184    
185                try {
186                    javax.jms.Message answer = consumer.receiveNoWait();
187                    if (answer != null) {
188                        return answer;
189                    }
190                } catch (JMSException e) {
191                    this.session.connection.onClientInternalException(e);
192                    return null;
193                }
194    
195                if (browseDone.get() || !session.isRunning()) {
196                    destroyConsumer();
197                    return null;
198                }
199    
200                waitForMessage();
201            }
202        }
203    
204        public synchronized void close() throws JMSException {
205            destroyConsumer();
206            closed = true;
207        }
208    
209        /**
210         * Gets the queue associated with this queue browser.
211         *
212         * @return the queue
213         * @throws JMSException if the JMS provider fails to get the queue
214         *                 associated with this browser due to some internal error.
215         */
216    
217        public Queue getQueue() throws JMSException {
218            return (Queue)destination;
219        }
220    
221        public String getMessageSelector() throws JMSException {
222            return selector;
223        }
224    
225        // Implementation methods
226        // -------------------------------------------------------------------------
227    
228        /**
229         * Wait on a semaphore for a fixed amount of time for a message to come in.
230         * @throws JMSException
231         */
232        protected void waitForMessage() {
233            try {
234                consumer.sendPullCommand(-1);
235                synchronized (semaphore) {
236                    semaphore.wait(2000);
237                }
238            } catch (InterruptedException e) {
239                Thread.currentThread().interrupt();
240            } catch (JMSException e) {
241            }
242    
243        }
244    
245        protected void notifyMessageAvailable() {
246            synchronized (semaphore) {
247                semaphore.notifyAll();
248            }
249        }
250    
251        public String toString() {
252            return "ActiveMQQueueBrowser { value=" + consumerId + " }";
253        }
254    
255    }