001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.Enumeration;
020 import java.util.concurrent.atomic.AtomicBoolean;
021
022 import javax.jms.*;
023 import javax.jms.IllegalStateException;
024
025 import org.apache.activemq.command.ActiveMQDestination;
026 import org.apache.activemq.command.ConsumerId;
027 import org.apache.activemq.command.MessageDispatch;
028 import org.apache.activemq.selector.SelectorParser;
029
030 /**
031 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
032 * queue without removing them. <p/>
033 * <P>
034 * The <CODE>getEnumeration</CODE> method returns a <CODE>
035 * java.util.Enumeration</CODE>
036 * that is used to scan the queue's messages. It may be an enumeration of the
037 * entire content of a queue, or it may contain only the messages matching a
038 * message selector. <p/>
039 * <P>
040 * Messages may be arriving and expiring while the scan is done. The JMS API
041 * does not require the content of an enumeration to be a static snapshot of
042 * queue content. Whether these changes are visible or not depends on the JMS
043 * provider. <p/>
044 * <P>
045 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
046 * </CODE>
047 * or a <CODE>QueueSession</CODE>.
048 *
049 * @see javax.jms.Session#createBrowser
050 * @see javax.jms.QueueSession#createBrowser
051 * @see javax.jms.QueueBrowser
052 * @see javax.jms.QueueReceiver
053 */
054
055 public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
056
057 private final ActiveMQSession session;
058 private final ActiveMQDestination destination;
059 private final String selector;
060
061 private ActiveMQMessageConsumer consumer;
062 private boolean closed;
063 private final ConsumerId consumerId;
064 private final AtomicBoolean browseDone = new AtomicBoolean(true);
065 private final boolean dispatchAsync;
066 private Object semaphore = new Object();
067
068 /**
069 * Constructor for an ActiveMQQueueBrowser - used internally
070 * @throws JMSException
071 */
072 protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
073 if (destination == null) {
074 throw new InvalidDestinationException("Don't understand null destinations");
075 } else if (destination.getPhysicalName() == null) {
076 throw new InvalidDestinationException("The destination object was not given a physical name.");
077 }
078 if (selector != null && selector.trim().length() != 0) {
079 // Validate the selector
080 SelectorParser.parse(selector);
081 }
082
083 this.session = session;
084 this.consumerId = consumerId;
085 this.destination = destination;
086 this.selector = selector;
087 this.dispatchAsync = dispatchAsync;
088 }
089
090 /**
091 * @throws JMSException
092 */
093 private ActiveMQMessageConsumer createConsumer() throws JMSException {
094 browseDone.set(false);
095 ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
096
097 return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
098 .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
099 public void dispatch(MessageDispatch md) {
100 if (md.getMessage() == null) {
101 browseDone.set(true);
102 } else {
103 super.dispatch(md);
104 }
105 notifyMessageAvailable();
106 }
107 };
108 }
109
110 private void destroyConsumer() {
111 if (consumer == null) {
112 return;
113 }
114 try {
115 if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) {
116 session.commit();
117 }
118 consumer.close();
119 consumer = null;
120 } catch (JMSException e) {
121 e.printStackTrace();
122 }
123 }
124
125 /**
126 * Gets an enumeration for browsing the current queue messages in the order
127 * they would be received.
128 *
129 * @return an enumeration for browsing the messages
130 * @throws JMSException if the JMS provider fails to get the enumeration for
131 * this browser due to some internal error.
132 */
133
134 public Enumeration getEnumeration() throws JMSException {
135 checkClosed();
136 if (consumer == null) {
137 consumer = createConsumer();
138 }
139 return this;
140 }
141
142 private void checkClosed() throws IllegalStateException {
143 if (closed) {
144 throw new IllegalStateException("The Consumer is closed");
145 }
146 }
147
148 /**
149 * @return true if more messages to process
150 */
151 public boolean hasMoreElements() {
152 while (true) {
153
154 synchronized (this) {
155 if (consumer == null) {
156 return false;
157 }
158 }
159
160 if (consumer.getMessageSize() > 0) {
161 return true;
162 }
163
164 if (browseDone.get() || !session.isRunning()) {
165 destroyConsumer();
166 return false;
167 }
168
169 waitForMessage();
170 }
171 }
172
173 /**
174 * @return the next message
175 */
176 public Object nextElement() {
177 while (true) {
178
179 synchronized (this) {
180 if (consumer == null) {
181 return null;
182 }
183 }
184
185 try {
186 javax.jms.Message answer = consumer.receiveNoWait();
187 if (answer != null) {
188 return answer;
189 }
190 } catch (JMSException e) {
191 this.session.connection.onClientInternalException(e);
192 return null;
193 }
194
195 if (browseDone.get() || !session.isRunning()) {
196 destroyConsumer();
197 return null;
198 }
199
200 waitForMessage();
201 }
202 }
203
204 public synchronized void close() throws JMSException {
205 destroyConsumer();
206 closed = true;
207 }
208
209 /**
210 * Gets the queue associated with this queue browser.
211 *
212 * @return the queue
213 * @throws JMSException if the JMS provider fails to get the queue
214 * associated with this browser due to some internal error.
215 */
216
217 public Queue getQueue() throws JMSException {
218 return (Queue)destination;
219 }
220
221 public String getMessageSelector() throws JMSException {
222 return selector;
223 }
224
225 // Implementation methods
226 // -------------------------------------------------------------------------
227
228 /**
229 * Wait on a semaphore for a fixed amount of time for a message to come in.
230 * @throws JMSException
231 */
232 protected void waitForMessage() {
233 try {
234 consumer.sendPullCommand(-1);
235 synchronized (semaphore) {
236 semaphore.wait(2000);
237 }
238 } catch (InterruptedException e) {
239 Thread.currentThread().interrupt();
240 } catch (JMSException e) {
241 }
242
243 }
244
245 protected void notifyMessageAvailable() {
246 synchronized (semaphore) {
247 semaphore.notifyAll();
248 }
249 }
250
251 public String toString() {
252 return "ActiveMQQueueBrowser { value=" + consumerId + " }";
253 }
254
255 }