001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.tool;
018
019 import java.util.concurrent.atomic.AtomicInteger;
020
021 import javax.jms.ConnectionFactory;
022 import javax.jms.Destination;
023 import javax.jms.JMSException;
024 import javax.jms.Message;
025 import javax.jms.MessageConsumer;
026 import javax.jms.MessageListener;
027 import javax.jms.Topic;
028
029 import org.apache.activemq.tool.properties.JmsClientProperties;
030 import org.apache.activemq.tool.properties.JmsConsumerProperties;
031 import org.slf4j.Logger;
032 import org.slf4j.LoggerFactory;
033
034 public class JmsConsumerClient extends AbstractJmsMeasurableClient {
035 private static final Logger LOG = LoggerFactory.getLogger(JmsConsumerClient.class);
036
037 protected MessageConsumer jmsConsumer;
038 protected JmsConsumerProperties client;
039
040 public JmsConsumerClient(ConnectionFactory factory) {
041 this(new JmsConsumerProperties(), factory);
042 }
043
044 public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) {
045 super(factory);
046 client = clientProps;
047 }
048
049 public void receiveMessages() throws JMSException {
050 if (client.isAsyncRecv()) {
051 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
052 receiveAsyncTimeBasedMessages(client.getRecvDuration());
053 } else {
054 receiveAsyncCountBasedMessages(client.getRecvCount());
055 }
056 } else {
057 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
058 receiveSyncTimeBasedMessages(client.getRecvDuration());
059 } else {
060 receiveSyncCountBasedMessages(client.getRecvCount());
061 }
062 }
063 }
064
065 public void receiveMessages(int destCount) throws JMSException {
066 this.destCount = destCount;
067 receiveMessages();
068 }
069
070 public void receiveMessages(int destIndex, int destCount) throws JMSException {
071 this.destIndex = destIndex;
072 receiveMessages(destCount);
073 }
074
075 public void receiveSyncTimeBasedMessages(long duration) throws JMSException {
076 if (getJmsConsumer() == null) {
077 createJmsConsumer();
078 }
079
080 try {
081 getConnection().start();
082
083 LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
084 long endTime = System.currentTimeMillis() + duration;
085
086 int counter = 0;
087 while (System.currentTimeMillis() < endTime) {
088 getJmsConsumer().receive();
089 incThroughput();
090 counter++;
091 sleep();
092 commitTxIfNecessary();
093 }
094 } finally {
095 if (client.isDurable() && client.isUnsubscribe()) {
096 LOG.info("Unsubscribing durable subscriber: " + getClientName());
097 getJmsConsumer().close();
098 getSession().unsubscribe(getClientName());
099 }
100 getConnection().close();
101 }
102 }
103
104 public void receiveSyncCountBasedMessages(long count) throws JMSException {
105 if (getJmsConsumer() == null) {
106 createJmsConsumer();
107 }
108
109 try {
110 getConnection().start();
111 LOG.info("Starting to synchronously receive " + count + " messages...");
112
113 int recvCount = 0;
114 while (recvCount < count) {
115 getJmsConsumer().receive();
116 incThroughput();
117 recvCount++;
118 sleep();
119 commitTxIfNecessary();
120 }
121 } finally {
122 if (client.isDurable() && client.isUnsubscribe()) {
123 LOG.info("Unsubscribing durable subscriber: " + getClientName());
124 getJmsConsumer().close();
125 getSession().unsubscribe(getClientName());
126 }
127 getConnection().close();
128 }
129 }
130
131 public void receiveAsyncTimeBasedMessages(long duration) throws JMSException {
132 if (getJmsConsumer() == null) {
133 createJmsConsumer();
134 }
135
136 getJmsConsumer().setMessageListener(new MessageListener() {
137 public void onMessage(Message msg) {
138 incThroughput();
139 sleep();
140 try {
141 commitTxIfNecessary();
142 } catch (JMSException ex) {
143 LOG.error("Error committing transaction: " + ex.getMessage());
144 }
145 }
146 });
147
148 try {
149 getConnection().start();
150 LOG.info("Starting to asynchronously receive messages for " + duration + " ms...");
151 try {
152 Thread.sleep(duration);
153 } catch (InterruptedException e) {
154 throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
155 }
156 } finally {
157 if (client.isDurable() && client.isUnsubscribe()) {
158 LOG.info("Unsubscribing durable subscriber: " + getClientName());
159 getJmsConsumer().close();
160 getSession().unsubscribe(getClientName());
161 }
162 getConnection().close();
163 }
164 }
165
166 public void receiveAsyncCountBasedMessages(long count) throws JMSException {
167 if (getJmsConsumer() == null) {
168 createJmsConsumer();
169 }
170
171 final AtomicInteger recvCount = new AtomicInteger(0);
172 getJmsConsumer().setMessageListener(new MessageListener() {
173 public void onMessage(Message msg) {
174 incThroughput();
175 recvCount.incrementAndGet();
176 synchronized (recvCount) {
177 recvCount.notify();
178 }
179
180 try {
181 commitTxIfNecessary();
182 } catch (JMSException ex) {
183 LOG.error("Error committing transaction: " + ex.getMessage());
184 }
185 }
186 });
187
188 try {
189 getConnection().start();
190 LOG.info("Starting to asynchronously receive " + client.getRecvCount() + " messages...");
191 try {
192 while (recvCount.get() < count) {
193 synchronized (recvCount) {
194 recvCount.wait();
195 }
196 }
197 } catch (InterruptedException e) {
198 throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
199 }
200 } finally {
201 if (client.isDurable() && client.isUnsubscribe()) {
202 LOG.info("Unsubscribing durable subscriber: " + getClientName());
203 getJmsConsumer().close();
204 getSession().unsubscribe(getClientName());
205 }
206 getConnection().close();
207 }
208 }
209
210 public MessageConsumer createJmsConsumer() throws JMSException {
211 Destination[] dest = createDestination(destIndex, destCount);
212
213 if (this.client.getMessageSelector() == null)
214 return createJmsConsumer(dest[0]);
215 else
216 return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
217 }
218
219 public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
220 if (client.isDurable()) {
221 String clientName = getClientName();
222 if (clientName == null) {
223 clientName = "JmsConsumer";
224 setClientName(clientName);
225 }
226 LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
227 jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName);
228 } else {
229 LOG.info("Creating non-durable consumer to: " + dest.toString());
230 jmsConsumer = getSession().createConsumer(dest);
231 }
232 return jmsConsumer;
233 }
234
235 public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
236 if (client.isDurable()) {
237 String clientName = getClientName();
238 if (clientName == null) {
239 clientName = "JmsConsumer";
240 setClientName(clientName);
241 }
242 LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
243 jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal);
244 } else {
245 LOG.info("Creating non-durable consumer to: " + dest.toString());
246 jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
247 }
248 return jmsConsumer;
249 }
250
251 public MessageConsumer getJmsConsumer() {
252 return jmsConsumer;
253 }
254
255 public JmsClientProperties getClient() {
256 return client;
257 }
258
259 public void setClient(JmsClientProperties clientProps) {
260 client = (JmsConsumerProperties)clientProps;
261 }
262
263 /**
264 * A way to throttle the consumer. Time to sleep is
265 * configured via recvDelay property.
266 */
267 protected void sleep() {
268 if (client.getRecvDelay() > 0) {
269 try {
270 LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
271 Thread.sleep(client.getRecvDelay());
272 } catch (java.lang.InterruptedException ex) {
273 LOG.warn(ex.getMessage());
274 }
275 }
276 }
277 }