001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.tool;
019    
020    import java.util.Properties;
021    
022    import javax.jms.BytesMessage;
023    import javax.jms.Connection;
024    import javax.jms.ConnectionFactory;
025    import javax.jms.DeliveryMode;
026    import javax.jms.Destination;
027    import javax.jms.JMSException;
028    import javax.jms.Message;
029    import javax.jms.Session;
030    
031    import org.apache.activemq.ActiveMQConnectionFactory;
032    import org.apache.activemq.broker.BrokerService;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    
037    public class JMSMemtest {
038    
039        private static final Logger LOG = LoggerFactory.getLogger(JMSMemtest.class);
040        private static final int DEFAULT_MESSAGECOUNT = 5000;
041        
042        protected BrokerService broker;
043        protected boolean topic = true;
044        protected boolean durable;
045        protected long messageCount;
046    
047        //  how large the message in kb before we close/start the producer/consumer with a new connection.  -1 means no connectionCheckpointSize
048        protected int connectionCheckpointSize;
049        protected long connectionInterval;
050    
051    
052        protected int consumerCount;
053        protected int producerCount;
054        protected int checkpointInterval;
055        protected int prefetchSize;
056        //set 10 kb of payload as default
057        protected int messageSize;
058    
059        protected String reportDirectory;
060        protected String reportName;
061    
062    
063        protected String url = "";
064        protected MemProducer[] producers;
065        protected MemConsumer[] consumers;
066        protected String destinationName;
067        protected boolean allMessagesConsumed = true;
068        protected MemConsumer allMessagesList = new MemConsumer();
069    
070        protected Message payload;
071    
072        protected ActiveMQConnectionFactory connectionFactory;
073        protected Connection connection;
074        protected Destination destination;
075    
076    
077        protected boolean createConnectionPerClient = true;
078    
079        protected boolean transacted;
080        protected boolean useEmbeddedBroker = true;
081        protected MemoryMonitoringTool memoryMonitoringTool;
082    
083        public JMSMemtest(Properties settings) {
084            url = settings.getProperty("url");
085            topic = new Boolean(settings.getProperty("topic")).booleanValue();
086            durable = new Boolean(settings.getProperty("durable")).booleanValue();
087            connectionCheckpointSize = new Integer(settings.getProperty("connectionCheckpointSize")).intValue();
088            producerCount = new Integer(settings.getProperty("producerCount")).intValue();
089            consumerCount = new Integer(settings.getProperty("consumerCount")).intValue();
090            messageCount = new Integer(settings.getProperty("messageCount")).intValue();
091            messageSize = new Integer(settings.getProperty("messageSize")).intValue();
092            prefetchSize = new Integer(settings.getProperty("prefetchSize")).intValue();
093            checkpointInterval = new Integer(settings.getProperty("checkpointInterval")).intValue() * 1000;
094            producerCount = new Integer(settings.getProperty("producerCount")).intValue();
095            reportName = settings.getProperty("reportName");
096            destinationName = settings.getProperty("destinationName");
097            reportDirectory = settings.getProperty("reportDirectory");
098            connectionInterval = connectionCheckpointSize * 1024;
099        }
100    
101        public static void main(String[] args) {
102    
103    
104            Properties sysSettings = new Properties();
105    
106            for (int i = 0; i < args.length; i++) {
107    
108                int index = args[i].indexOf("=");
109                String key = args[i].substring(0, index);
110                String val = args[i].substring(index + 1);
111                sysSettings.setProperty(key, val);
112    
113            }
114    
115    
116            JMSMemtest memtest = new JMSMemtest(sysSettings);
117            try {
118                memtest.start();
119            } catch (Exception e) {
120    
121                e.printStackTrace();
122            }
123    
124        }
125    
126        protected void start() throws Exception {
127            LOG.info("Starting Monitor");
128            memoryMonitoringTool = new MemoryMonitoringTool();
129            memoryMonitoringTool.setTestSettings(getSysTestSettings());
130            Thread monitorThread = memoryMonitoringTool.startMonitor();
131    
132            if (messageCount == 0) {
133                messageCount = DEFAULT_MESSAGECOUNT;
134            }
135    
136    
137            if (useEmbeddedBroker) {
138                if (broker == null) {
139                    broker = createBroker();
140                }
141            }
142    
143    
144            connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory();
145            if (prefetchSize > 0) {
146                connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
147                connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
148            }
149    
150            connection = connectionFactory.createConnection();
151            Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
152    
153            if (topic) {
154                destination = session.createTopic(destinationName);
155            } else {
156                destination = session.createQueue(destinationName);
157            }
158    
159            createPayload(session);
160    
161            publishAndConsume();
162    
163            LOG.info("Closing resources");
164            this.close();
165    
166            monitorThread.join();
167    
168    
169        }
170    
171    
172        protected boolean resetConnection(int counter) {
173            if (connectionInterval > 0) {
174                long totalMsgSizeConsumed = counter * 1024;
175                if (connectionInterval < totalMsgSizeConsumed) {
176                    return true;
177                }
178            }
179            return false;
180        }
181    
182        protected void publishAndConsume() throws Exception {
183    
184            createConsumers();
185            createProducers();
186            int counter = 0;
187            boolean resetCon = false;
188            LOG.info("Start sending messages ");
189            for (int i = 0; i < messageCount; i++) {
190                if (resetCon) {
191                    closeConsumers();
192                    closeProducers();
193                    createConsumers();
194                    createProducers();
195                    resetCon = false;
196                }
197    
198                for (int k = 0; k < producers.length; k++) {
199                    producers[k].sendMessage(payload, "counter", counter);
200                    counter++;
201                    if (resetConnection(counter)) {
202                        resetCon = true;
203                        break;
204                    }
205                }
206            }
207        }
208    
209    
210        protected void close() throws Exception {
211            connection.close();
212            broker.stop();
213    
214            memoryMonitoringTool.stopMonitor();
215        }
216    
217        protected void createPayload(Session session) throws JMSException {
218    
219            byte[] array = new byte[messageSize];
220            for (int i = 0; i < array.length; i++) {
221                array[i] = (byte) i;
222            }
223    
224            BytesMessage bystePayload = session.createBytesMessage();
225            bystePayload.writeBytes(array);
226            payload = (Message) bystePayload;
227        }
228    
229    
230        protected void createProducers() throws JMSException {
231            producers = new MemProducer[producerCount];
232            for (int i = 0; i < producerCount; i++) {
233                producers[i] = new MemProducer(connectionFactory, destination);
234                if (durable) {
235                    producers[i].setDeliveryMode(DeliveryMode.PERSISTENT);
236                } else {
237                    producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
238                }
239                producers[i].start();
240            }
241    
242        }
243    
244        protected void createConsumers() throws JMSException {
245            consumers = new MemConsumer[consumerCount];
246            for (int i = 0; i < consumerCount; i++) {
247                consumers[i] = new MemConsumer(connectionFactory, destination);
248                consumers[i].setParent(allMessagesList);
249                consumers[i].start();
250    
251    
252            }
253        }
254    
255        protected void closeProducers() throws JMSException {
256            for (int i = 0; i < producerCount; i++) {
257                producers[i].shutDown();
258            }
259    
260        }
261    
262        protected void closeConsumers() throws JMSException {
263            for (int i = 0; i < consumerCount; i++) {
264                consumers[i].shutDown();
265            }
266        }
267    
268        protected ConnectionFactory createConnectionFactory() throws JMSException {
269    
270            if (url == null || url.trim().equals("") || url.trim().equals("null")) {
271                return new ActiveMQConnectionFactory("vm://localhost");
272            } else {
273                return new ActiveMQConnectionFactory(url);
274            }
275        }
276    
277        protected BrokerService createBroker() throws Exception {
278            BrokerService broker = new BrokerService();
279            configureBroker(broker);
280            broker.start();
281            return broker;
282        }
283    
284        protected void configureBroker(BrokerService broker) throws Exception {
285            broker.addConnector("vm://localhost");
286            broker.setDeleteAllMessagesOnStartup(true);
287        }
288    
289        protected Properties getSysTestSettings() {
290            Properties settings = new Properties();
291            settings.setProperty("domain", topic ? "topic" : "queue");
292            settings.setProperty("durable", durable ? "durable" : "non-durable");
293            settings.setProperty("connection_checkpoint_size_kb", new Integer(connectionCheckpointSize).toString());
294            settings.setProperty("producer_count", new Integer(producerCount).toString());
295            settings.setProperty("consumer_count", new Integer(consumerCount).toString());
296            settings.setProperty("message_count", new Long(messageCount).toString());
297            settings.setProperty("message_size", new Integer(messageSize).toString());
298            settings.setProperty("prefetchSize", new Integer(prefetchSize).toString());
299            settings.setProperty("checkpoint_interval", new Integer(checkpointInterval).toString());
300            settings.setProperty("destination_name", destinationName);
301            settings.setProperty("report_name", reportName);
302            settings.setProperty("report_directory", reportDirectory);
303            settings.setProperty("connection_checkpoint_size", new Integer(connectionCheckpointSize).toString());
304            return settings;
305        }
306    
307    
308    }