001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.util.concurrent.atomic.AtomicInteger;
020    
021    import javax.jms.ConnectionFactory;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.MessageConsumer;
026    import javax.jms.MessageListener;
027    import javax.jms.Topic;
028    
029    import org.apache.activemq.tool.properties.JmsClientProperties;
030    import org.apache.activemq.tool.properties.JmsConsumerProperties;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    public class JmsConsumerClient extends AbstractJmsMeasurableClient {
035        private static final Logger LOG = LoggerFactory.getLogger(JmsConsumerClient.class);
036    
037        protected MessageConsumer jmsConsumer;
038        protected JmsConsumerProperties client;
039    
040        public JmsConsumerClient(ConnectionFactory factory) {
041            this(new JmsConsumerProperties(), factory);
042        }
043    
044        public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) {
045            super(factory);
046            client = clientProps;
047        }
048    
049        public void receiveMessages() throws JMSException {
050            if (client.isAsyncRecv()) {
051                if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
052                    receiveAsyncTimeBasedMessages(client.getRecvDuration());
053                } else {
054                    receiveAsyncCountBasedMessages(client.getRecvCount());
055                }
056            } else {
057                if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
058                    receiveSyncTimeBasedMessages(client.getRecvDuration());
059                } else {
060                    receiveSyncCountBasedMessages(client.getRecvCount());
061                }
062            }
063        }
064    
065        public void receiveMessages(int destCount) throws JMSException {
066            this.destCount = destCount;
067            receiveMessages();
068        }
069    
070        public void receiveMessages(int destIndex, int destCount) throws JMSException {
071            this.destIndex = destIndex;
072            receiveMessages(destCount);
073        }
074    
075        public void receiveSyncTimeBasedMessages(long duration) throws JMSException {
076            if (getJmsConsumer() == null) {
077                createJmsConsumer();
078            }
079    
080            try {
081                getConnection().start();
082    
083                LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
084                long endTime = System.currentTimeMillis() + duration;
085    
086                int counter = 0;
087                while (System.currentTimeMillis() < endTime) {
088                    getJmsConsumer().receive();
089                    incThroughput();
090                    counter++;
091                    sleep();
092                    commitTxIfNecessary();
093                }
094            } finally {
095                if (client.isDurable() && client.isUnsubscribe()) {
096                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
097                    getJmsConsumer().close();
098                    getSession().unsubscribe(getClientName());
099                }
100                getConnection().close();
101            }
102        }
103    
104        public void receiveSyncCountBasedMessages(long count) throws JMSException {
105            if (getJmsConsumer() == null) {
106                createJmsConsumer();
107            }
108    
109            try {
110                getConnection().start();
111                LOG.info("Starting to synchronously receive " + count + " messages...");
112    
113                int recvCount = 0;
114                while (recvCount < count) {
115                    getJmsConsumer().receive();
116                    incThroughput();
117                    recvCount++;
118                    sleep();
119                    commitTxIfNecessary();
120                }
121            } finally {
122                if (client.isDurable() && client.isUnsubscribe()) {
123                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
124                    getJmsConsumer().close();
125                    getSession().unsubscribe(getClientName());
126                }
127                getConnection().close();
128            }
129        }
130    
131        public void receiveAsyncTimeBasedMessages(long duration) throws JMSException {
132            if (getJmsConsumer() == null) {
133                createJmsConsumer();
134            }
135    
136            getJmsConsumer().setMessageListener(new MessageListener() {
137                public void onMessage(Message msg) {
138                    incThroughput();
139                    sleep();
140                    try {
141                            commitTxIfNecessary();
142                    } catch (JMSException ex) {
143                            LOG.error("Error committing transaction: " + ex.getMessage());
144                    }
145                }
146            });
147    
148            try {
149                getConnection().start();
150                LOG.info("Starting to asynchronously receive messages for " + duration + " ms...");
151                try {
152                    Thread.sleep(duration);
153                } catch (InterruptedException e) {
154                    throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
155                }
156            } finally {
157                if (client.isDurable() && client.isUnsubscribe()) {
158                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
159                    getJmsConsumer().close();
160                    getSession().unsubscribe(getClientName());
161                }
162                getConnection().close();
163            }
164        }
165    
166        public void receiveAsyncCountBasedMessages(long count) throws JMSException {
167            if (getJmsConsumer() == null) {
168                createJmsConsumer();
169            }
170    
171            final AtomicInteger recvCount = new AtomicInteger(0);
172            getJmsConsumer().setMessageListener(new MessageListener() {
173                public void onMessage(Message msg) {
174                    incThroughput();
175                    recvCount.incrementAndGet();
176                    synchronized (recvCount) {
177                        recvCount.notify();
178                    } 
179                    
180                    try {
181                            commitTxIfNecessary();
182                    } catch (JMSException ex) {
183                            LOG.error("Error committing transaction: " + ex.getMessage());
184                    }
185                }
186            });
187    
188            try {
189                getConnection().start();
190                LOG.info("Starting to asynchronously receive " + client.getRecvCount() + " messages...");
191                try {
192                    while (recvCount.get() < count) {
193                        synchronized (recvCount) {
194                            recvCount.wait();
195                        }
196                    }
197                } catch (InterruptedException e) {
198                    throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
199                }
200            } finally {
201                if (client.isDurable() && client.isUnsubscribe()) {
202                    LOG.info("Unsubscribing durable subscriber: " + getClientName());
203                    getJmsConsumer().close();
204                    getSession().unsubscribe(getClientName());
205                }
206                getConnection().close();
207            }
208        }
209    
210        public MessageConsumer createJmsConsumer() throws JMSException {
211            Destination[] dest = createDestination(destIndex, destCount);
212            
213            if (this.client.getMessageSelector() == null)
214                    return createJmsConsumer(dest[0]);
215            else 
216                    return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
217        }
218    
219        public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
220            if (client.isDurable()) {
221                String clientName = getClientName();
222                if (clientName == null) {
223                    clientName = "JmsConsumer";
224                    setClientName(clientName);
225                }
226                LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
227                jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName);
228            } else {
229                LOG.info("Creating non-durable consumer to: " + dest.toString());
230                jmsConsumer = getSession().createConsumer(dest);
231            }
232            return jmsConsumer;
233        }
234    
235        public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
236            if (client.isDurable()) {
237                String clientName = getClientName();
238                if (clientName == null) {
239                    clientName = "JmsConsumer";
240                    setClientName(clientName);
241                }
242                LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
243                jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal);
244            } else {
245                LOG.info("Creating non-durable consumer to: " + dest.toString());
246                jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
247            }
248            return jmsConsumer;
249        }
250    
251        public MessageConsumer getJmsConsumer() {
252            return jmsConsumer;
253        }
254    
255        public JmsClientProperties getClient() {
256            return client;
257        }
258    
259        public void setClient(JmsClientProperties clientProps) {
260            client = (JmsConsumerProperties)clientProps;
261        }
262        
263        /**
264         * A way to throttle the consumer. Time to sleep is 
265         * configured via recvDelay property. 
266         */
267        protected void sleep() {
268            if (client.getRecvDelay() > 0) {
269                    try {
270                            LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
271                            Thread.sleep(client.getRecvDelay());
272                    } catch (java.lang.InterruptedException ex) {
273                            LOG.warn(ex.getMessage());
274                    }
275            }
276        }
277    }