001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.tool;
019
020import java.util.Properties;
021
022import javax.jms.BytesMessage;
023import javax.jms.Connection;
024import javax.jms.ConnectionFactory;
025import javax.jms.DeliveryMode;
026import javax.jms.Destination;
027import javax.jms.JMSException;
028import javax.jms.Message;
029import javax.jms.Session;
030
031import org.apache.activemq.ActiveMQConnectionFactory;
032import org.apache.activemq.broker.BrokerService;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036
037public class JMSMemtest {
038
039    private static final Logger LOG = LoggerFactory.getLogger(JMSMemtest.class);
040    private static final int DEFAULT_MESSAGECOUNT = 5000;
041    
042    protected BrokerService broker;
043    protected boolean topic = true;
044    protected boolean durable;
045    protected long messageCount;
046
047    //  how large the message in kb before we close/start the producer/consumer with a new connection.  -1 means no connectionCheckpointSize
048    protected int connectionCheckpointSize;
049    protected long connectionInterval;
050
051
052    protected int consumerCount;
053    protected int producerCount;
054    protected int checkpointInterval;
055    protected int prefetchSize;
056    //set 10 kb of payload as default
057    protected int messageSize;
058
059    protected String reportDirectory;
060    protected String reportName;
061
062
063    protected String url = "";
064    protected MemProducer[] producers;
065    protected MemConsumer[] consumers;
066    protected String destinationName;
067    protected boolean allMessagesConsumed = true;
068    protected MemConsumer allMessagesList = new MemConsumer();
069
070    protected Message payload;
071
072    protected ActiveMQConnectionFactory connectionFactory;
073    protected Connection connection;
074    protected Destination destination;
075
076
077    protected boolean createConnectionPerClient = true;
078
079    protected boolean transacted;
080    protected boolean useEmbeddedBroker = true;
081    protected MemoryMonitoringTool memoryMonitoringTool;
082
083    public JMSMemtest(Properties settings) {
084        url = settings.getProperty("url");
085        topic = new Boolean(settings.getProperty("topic")).booleanValue();
086        durable = new Boolean(settings.getProperty("durable")).booleanValue();
087        connectionCheckpointSize = new Integer(settings.getProperty("connectionCheckpointSize")).intValue();
088        producerCount = new Integer(settings.getProperty("producerCount")).intValue();
089        consumerCount = new Integer(settings.getProperty("consumerCount")).intValue();
090        messageCount = new Integer(settings.getProperty("messageCount")).intValue();
091        messageSize = new Integer(settings.getProperty("messageSize")).intValue();
092        prefetchSize = new Integer(settings.getProperty("prefetchSize")).intValue();
093        checkpointInterval = new Integer(settings.getProperty("checkpointInterval")).intValue() * 1000;
094        producerCount = new Integer(settings.getProperty("producerCount")).intValue();
095        reportName = settings.getProperty("reportName");
096        destinationName = settings.getProperty("destinationName");
097        reportDirectory = settings.getProperty("reportDirectory");
098        connectionInterval = connectionCheckpointSize * 1024;
099    }
100
101    public static void main(String[] args) {
102
103
104        Properties sysSettings = new Properties();
105
106        for (int i = 0; i < args.length; i++) {
107
108            int index = args[i].indexOf("=");
109            String key = args[i].substring(0, index);
110            String val = args[i].substring(index + 1);
111            sysSettings.setProperty(key, val);
112
113        }
114
115
116        JMSMemtest memtest = new JMSMemtest(sysSettings);
117        try {
118            memtest.start();
119        } catch (Exception e) {
120
121            e.printStackTrace();
122        }
123
124    }
125
126    protected void start() throws Exception {
127        LOG.info("Starting Monitor");
128        memoryMonitoringTool = new MemoryMonitoringTool();
129        memoryMonitoringTool.setTestSettings(getSysTestSettings());
130        Thread monitorThread = memoryMonitoringTool.startMonitor();
131
132        if (messageCount == 0) {
133            messageCount = DEFAULT_MESSAGECOUNT;
134        }
135
136
137        if (useEmbeddedBroker) {
138            if (broker == null) {
139                broker = createBroker();
140            }
141        }
142
143
144        connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory();
145        if (prefetchSize > 0) {
146            connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
147            connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
148        }
149
150        connection = connectionFactory.createConnection();
151        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
152
153        if (topic) {
154            destination = session.createTopic(destinationName);
155        } else {
156            destination = session.createQueue(destinationName);
157        }
158
159        createPayload(session);
160
161        publishAndConsume();
162
163        LOG.info("Closing resources");
164        this.close();
165
166        monitorThread.join();
167
168
169    }
170
171
172    protected boolean resetConnection(int counter) {
173        if (connectionInterval > 0) {
174            long totalMsgSizeConsumed = counter * 1024;
175            if (connectionInterval < totalMsgSizeConsumed) {
176                return true;
177            }
178        }
179        return false;
180    }
181
182    protected void publishAndConsume() throws Exception {
183
184        createConsumers();
185        createProducers();
186        int counter = 0;
187        boolean resetCon = false;
188        LOG.info("Start sending messages ");
189        for (int i = 0; i < messageCount; i++) {
190            if (resetCon) {
191                closeConsumers();
192                closeProducers();
193                createConsumers();
194                createProducers();
195                resetCon = false;
196            }
197
198            for (int k = 0; k < producers.length; k++) {
199                producers[k].sendMessage(payload, "counter", counter);
200                counter++;
201                if (resetConnection(counter)) {
202                    resetCon = true;
203                    break;
204                }
205            }
206        }
207    }
208
209
210    protected void close() throws Exception {
211        connection.close();
212        broker.stop();
213
214        memoryMonitoringTool.stopMonitor();
215    }
216
217    protected void createPayload(Session session) throws JMSException {
218
219        byte[] array = new byte[messageSize];
220        for (int i = 0; i < array.length; i++) {
221            array[i] = (byte) i;
222        }
223
224        BytesMessage bystePayload = session.createBytesMessage();
225        bystePayload.writeBytes(array);
226        payload = (Message) bystePayload;
227    }
228
229
230    protected void createProducers() throws JMSException {
231        producers = new MemProducer[producerCount];
232        for (int i = 0; i < producerCount; i++) {
233            producers[i] = new MemProducer(connectionFactory, destination);
234            if (durable) {
235                producers[i].setDeliveryMode(DeliveryMode.PERSISTENT);
236            } else {
237                producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
238            }
239            producers[i].start();
240        }
241
242    }
243
244    protected void createConsumers() throws JMSException {
245        consumers = new MemConsumer[consumerCount];
246        for (int i = 0; i < consumerCount; i++) {
247            consumers[i] = new MemConsumer(connectionFactory, destination);
248            consumers[i].setParent(allMessagesList);
249            consumers[i].start();
250
251
252        }
253    }
254
255    protected void closeProducers() throws JMSException {
256        for (int i = 0; i < producerCount; i++) {
257            producers[i].shutDown();
258        }
259
260    }
261
262    protected void closeConsumers() throws JMSException {
263        for (int i = 0; i < consumerCount; i++) {
264            consumers[i].shutDown();
265        }
266    }
267
268    protected ConnectionFactory createConnectionFactory() throws JMSException {
269
270        if (url == null || url.trim().equals("") || url.trim().equals("null")) {
271            return new ActiveMQConnectionFactory("vm://localhost");
272        } else {
273            return new ActiveMQConnectionFactory(url);
274        }
275    }
276
277    protected BrokerService createBroker() throws Exception {
278        BrokerService broker = new BrokerService();
279        configureBroker(broker);
280        broker.start();
281        return broker;
282    }
283
284    protected void configureBroker(BrokerService broker) throws Exception {
285        broker.addConnector("vm://localhost");
286        broker.setDeleteAllMessagesOnStartup(true);
287    }
288
289    protected Properties getSysTestSettings() {
290        Properties settings = new Properties();
291        settings.setProperty("domain", topic ? "topic" : "queue");
292        settings.setProperty("durable", durable ? "durable" : "non-durable");
293        settings.setProperty("connection_checkpoint_size_kb", new Integer(connectionCheckpointSize).toString());
294        settings.setProperty("producer_count", new Integer(producerCount).toString());
295        settings.setProperty("consumer_count", new Integer(consumerCount).toString());
296        settings.setProperty("message_count", new Long(messageCount).toString());
297        settings.setProperty("message_size", new Integer(messageSize).toString());
298        settings.setProperty("prefetchSize", new Integer(prefetchSize).toString());
299        settings.setProperty("checkpoint_interval", new Integer(checkpointInterval).toString());
300        settings.setProperty("destination_name", destinationName);
301        settings.setProperty("report_name", reportName);
302        settings.setProperty("report_directory", reportDirectory);
303        settings.setProperty("connection_checkpoint_size", new Integer(connectionCheckpointSize).toString());
304        return settings;
305    }
306
307
308}