001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.io.BufferedReader;
020    import java.io.File;
021    import java.io.FileNotFoundException;
022    import java.io.FileReader;
023    import java.io.IOException;
024    
025    import java.util.Arrays;
026    import java.util.Set;
027    
028    import javax.jms.ConnectionFactory;
029    import javax.jms.DeliveryMode;
030    import javax.jms.Destination;
031    import javax.jms.JMSException;
032    import javax.jms.MessageProducer;
033    import javax.jms.TextMessage;
034    
035    import org.apache.activemq.tool.properties.JmsClientProperties;
036    import org.apache.activemq.tool.properties.JmsProducerProperties;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    public class JmsProducerClient extends AbstractJmsMeasurableClient {
041        private static final Logger LOG = LoggerFactory.getLogger(JmsProducerClient.class);
042    
043        protected JmsProducerProperties client;
044        protected MessageProducer jmsProducer;
045        protected TextMessage jmsTextMessage;
046    
047        public JmsProducerClient(ConnectionFactory factory) {
048            this(new JmsProducerProperties(), factory);
049        }
050    
051        public JmsProducerClient(JmsProducerProperties clientProps, ConnectionFactory factory) {
052            super(factory);
053            this.client = clientProps;
054        }
055    
056        public void sendMessages() throws JMSException {
057            // Send a specific number of messages
058            if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) {
059                sendCountBasedMessages(client.getSendCount());
060    
061            // Send messages for a specific duration
062            } else {
063                sendTimeBasedMessages(client.getSendDuration());
064            }
065        }
066    
067        public void sendMessages(int destCount) throws JMSException {
068            this.destCount = destCount;
069            sendMessages();
070        }
071    
072        public void sendMessages(int destIndex, int destCount) throws JMSException {
073            this.destIndex = destIndex;
074            sendMessages(destCount);
075        }
076    
077        public void sendCountBasedMessages(long messageCount) throws JMSException {
078            // Parse through different ways to send messages
079            // Avoided putting the condition inside the loop to prevent effect on performance
080            Destination[] dest = createDestination(destIndex, destCount);
081    
082            // Create a producer, if none is created.
083            if (getJmsProducer() == null) {
084                if (dest.length == 1) {
085                    createJmsProducer(dest[0]);
086                } else {
087                    createJmsProducer();
088                }
089            }
090            try {
091                getConnection().start();
092                if (client.getMsgFileName() != null) {
093                    LOG.info("Starting to publish " +
094                            messageCount + 
095                            " messages from file " + 
096                            client.getMsgFileName()
097                    );
098                } else {
099                    LOG.info("Starting to publish " +
100                            messageCount +
101                            " messages of size " +
102                            client.getMessageSize() + 
103                            " byte(s)." 
104                    );
105                }
106    
107                // Send one type of message only, avoiding the creation of different messages on sending
108                if (!client.isCreateNewMsg()) {
109                    // Create only a single message
110                    createJmsTextMessage();
111    
112                    // Send to more than one actual destination
113                    if (dest.length > 1) {
114                        for (int i = 0; i < messageCount; i++) {
115                            for (int j = 0; j < dest.length; j++) {
116                                getJmsProducer().send(dest[j], getJmsTextMessage());
117                                incThroughput();
118                                sleep();
119                                commitTxIfNecessary();
120                            }
121                        }
122                        // Send to only one actual destination
123                    } else {
124                        for (int i = 0; i < messageCount; i++) {
125                            getJmsProducer().send(getJmsTextMessage());
126                            incThroughput();
127                            sleep();
128                            commitTxIfNecessary();
129                        }
130                    }
131    
132                    // Send different type of messages using indexing to identify each one.
133                    // Message size will vary. Definitely slower, since messages properties
134                    // will be set individually each send.
135                } else {
136                    // Send to more than one actual destination
137                    if (dest.length > 1) {
138                        for (int i = 0; i < messageCount; i++) {
139                            for (int j = 0; j < dest.length; j++) {
140                                getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
141                                incThroughput();
142                                sleep();
143                                commitTxIfNecessary();
144                            }
145                        }
146    
147                        // Send to only one actual destination
148                    } else {
149                        for (int i = 0; i < messageCount; i++) {
150                            getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
151                            incThroughput();
152                            sleep();
153                            commitTxIfNecessary();
154                        }
155                    }
156                }
157            } finally {
158                getConnection().close();
159            }
160        }
161    
162        public void sendTimeBasedMessages(long duration) throws JMSException {
163            long endTime = System.currentTimeMillis() + duration;
164            // Parse through different ways to send messages
165            // Avoided putting the condition inside the loop to prevent effect on performance
166    
167            Destination[] dest = createDestination(destIndex, destCount);
168    
169            // Create a producer, if none is created.
170            if (getJmsProducer() == null) {
171                if (dest.length == 1) {
172                    createJmsProducer(dest[0]);
173                } else {
174                    createJmsProducer();
175                }
176            }
177    
178            try {
179                getConnection().start();
180                if (client.getMsgFileName() != null) {
181                    LOG.info("Starting to publish messages from file " + 
182                                    client.getMsgFileName() + 
183                                    " for " +
184                                    duration + 
185                                    " ms");
186                } else {
187                    LOG.info("Starting to publish " + 
188                                    client.getMessageSize() + 
189                                    " byte(s) messages for " + 
190                                    duration + 
191                                    " ms");
192                }
193                // Send one type of message only, avoiding the creation of different messages on sending
194                if (!client.isCreateNewMsg()) {
195                    // Create only a single message
196                    createJmsTextMessage();
197    
198                    // Send to more than one actual destination
199                    if (dest.length > 1) {
200                        while (System.currentTimeMillis() < endTime) {
201                            for (int j = 0; j < dest.length; j++) {
202                                getJmsProducer().send(dest[j], getJmsTextMessage());
203                                incThroughput();
204                                sleep();
205                                commitTxIfNecessary();
206                            }
207                        }
208                        // Send to only one actual destination
209                    } else {
210                        while (System.currentTimeMillis() < endTime) {
211                            getJmsProducer().send(getJmsTextMessage());
212                            incThroughput();
213                            sleep();
214                            commitTxIfNecessary();
215                        }
216                    }
217    
218                    // Send different type of messages using indexing to identify each one.
219                    // Message size will vary. Definitely slower, since messages properties
220                    // will be set individually each send.
221                } else {
222                    // Send to more than one actual destination
223                    long count = 1;
224                    if (dest.length > 1) {
225                        while (System.currentTimeMillis() < endTime) {
226                            for (int j = 0; j < dest.length; j++) {
227                                getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
228                                incThroughput();
229                                sleep();
230                                commitTxIfNecessary();
231                            }
232                        }
233    
234                        // Send to only one actual destination
235                    } else {
236                        while (System.currentTimeMillis() < endTime) {
237    
238                            getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
239                            incThroughput();
240                            sleep();
241                            commitTxIfNecessary();
242                        }
243                    }
244                }
245            } finally {
246                getConnection().close();
247            }
248        }
249    
250        public MessageProducer createJmsProducer() throws JMSException {
251            jmsProducer = getSession().createProducer(null);
252            if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) {
253                LOG.info("Creating producer to possible multiple destinations with persistent delivery.");
254                jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
255            } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) {
256                LOG.info("Creating producer to possible multiple destinations with non-persistent delivery.");
257                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
258            } else {
259                LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent.");
260                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
261            }
262            return jmsProducer;
263        }
264    
265        public MessageProducer createJmsProducer(Destination dest) throws JMSException {
266            jmsProducer = getSession().createProducer(dest);
267            if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) {
268                LOG.info("Creating producer to: " + dest.toString() + " with persistent delivery.");
269                jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
270            } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) {
271                LOG.info("Creating  producer to: " + dest.toString() + " with non-persistent delivery.");
272                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
273            } else {
274                LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent.");
275                jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
276            }
277            return jmsProducer;
278        }
279    
280        public MessageProducer getJmsProducer() {
281            return jmsProducer;
282        }
283    
284        public TextMessage createJmsTextMessage() throws JMSException {
285            if (client.getMsgFileName() != null) {
286                    return loadJmsMessage();
287            } else {
288              return createJmsTextMessage(client.getMessageSize());
289            }
290        }
291    
292        public TextMessage createJmsTextMessage(int size) throws JMSException {
293            jmsTextMessage = getSession().createTextMessage(buildText("", size));
294            
295            // support for adding message headers
296            Set<String> headerKeys = this.client.getHeaderKeys();
297            for (String key : headerKeys) {
298                    jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
299            }
300            
301            return jmsTextMessage;
302        }
303    
304        public TextMessage createJmsTextMessage(String text) throws JMSException {
305            jmsTextMessage = getSession().createTextMessage(buildText(text, client.getMessageSize()));
306            return jmsTextMessage;
307        }
308    
309        public TextMessage getJmsTextMessage() {
310            return jmsTextMessage;
311        }
312    
313        public JmsClientProperties getClient() {
314            return client;
315        }
316    
317        public void setClient(JmsClientProperties clientProps) {
318            client = (JmsProducerProperties)clientProps;
319        }
320    
321        protected String buildText(String text, int size) {
322            byte[] data = new byte[size - text.length()];
323            Arrays.fill(data, (byte) 0);
324            return text + new String(data);
325        }
326        
327        protected void sleep() {
328            if (client.getSendDelay() > 0) {
329                    try {
330                            LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
331                            Thread.sleep(client.getSendDelay());
332                    } catch (java.lang.InterruptedException ex) {
333                            LOG.warn(ex.getMessage());
334                    }
335            }
336        }
337        
338        /**
339         * loads the message to be sent from the specified TextFile
340         */
341        protected TextMessage loadJmsMessage() throws JMSException {
342            try {
343                    // couple of sanity checks upfront 
344                    if (client.getMsgFileName() == null) {
345                            throw new JMSException("Invalid filename specified.");
346                    }
347                    
348                    File f = new File(client.getMsgFileName());
349                    if (f.isDirectory()) {
350                            throw new JMSException("Cannot load from " + 
351                                            client.getMsgFileName() + 
352                                            " as it is a directory not a text file.");
353                    } 
354                    
355                    // try to load file
356                    BufferedReader br = new BufferedReader(new FileReader(f));
357                    StringBuffer payload = new StringBuffer();
358                    String tmp = null;
359                    while ((tmp = br.readLine()) != null) {
360                            payload.append(tmp);
361                    }
362                    jmsTextMessage = getSession().createTextMessage(payload.toString());
363                    return jmsTextMessage;
364                    
365            } catch (FileNotFoundException ex) {
366                    throw new JMSException(ex.getMessage());
367            } catch (IOException iox) {
368                    throw new JMSException(iox.getMessage());
369            }
370        }
371    }