001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.io.File;
020    import java.io.FileInputStream;
021    import java.io.IOException;
022    import java.util.Enumeration;
023    import java.util.Properties;
024    
025    import javax.jms.ConnectionFactory;
026    import javax.jms.ConnectionMetaData;
027    import javax.jms.JMSException;
028    
029    import org.apache.activemq.tool.properties.AbstractObjectProperties;
030    import org.apache.activemq.tool.properties.JmsClientProperties;
031    import org.apache.activemq.tool.properties.JmsClientSystemProperties;
032    import org.apache.activemq.tool.properties.JmsFactoryProperties;
033    import org.apache.activemq.tool.properties.ReflectionUtil;
034    import org.apache.activemq.tool.reports.PerformanceReportWriter;
035    import org.apache.activemq.tool.reports.VerbosePerfReportWriter;
036    import org.apache.activemq.tool.reports.XmlFilePerfReportWriter;
037    import org.apache.activemq.tool.sampler.CpuSamplerTask;
038    import org.apache.activemq.tool.sampler.ThroughputSamplerTask;
039    import org.apache.activemq.tool.spi.SPIConnectionFactory;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
044        private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClientSystem.class);
045    
046        protected ThreadGroup clientThreadGroup;
047        protected ConnectionFactory jmsConnFactory;
048    
049        // Properties
050        protected JmsFactoryProperties factory = new JmsFactoryProperties();
051        protected ThroughputSamplerTask tpSampler = new ThroughputSamplerTask();
052        protected CpuSamplerTask cpuSampler = new CpuSamplerTask();
053    
054        private int clientDestIndex;
055        private int clientDestCount;
056    
057        public void runSystemTest() throws JMSException {
058            // Create connection factory
059            jmsConnFactory = loadJmsFactory(getSysTest().getSpiClass(), factory.getFactorySettings());
060    
061            setProviderMetaData(jmsConnFactory.createConnection().getMetaData(), getJmsClientProperties());
062    
063            // Create performance sampler
064            PerformanceReportWriter writer = createPerfWriter();
065            tpSampler.setPerfReportWriter(writer);
066            cpuSampler.setPerfReportWriter(writer);
067    
068            writer.openReportWriter();
069            writer.writeProperties("jvmSettings", System.getProperties());
070            writer.writeProperties("testSystemSettings", ReflectionUtil.retrieveObjectProperties(getSysTest()));
071            writer.writeProperties("jmsFactorySettings", ReflectionUtil.retrieveObjectProperties(jmsConnFactory));
072            writer.writeProperties("jmsClientSettings", ReflectionUtil.retrieveObjectProperties(getJmsClientProperties()));
073            writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler));
074            writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler));
075    
076            clientThreadGroup = new ThreadGroup(getSysTest().getClientPrefix() + " Thread Group");
077            for (int i = 0; i < getSysTest().getNumClients(); i++) {
078                distributeDestinations(getSysTest().getDestDistro(), i, getSysTest().getNumClients(), getSysTest().getTotalDests());
079    
080                final String clientName = getSysTest().getClientPrefix() + i;
081                final int clientDestIndex = this.clientDestIndex;
082                final int clientDestCount = this.clientDestCount;
083                Thread t = new Thread(clientThreadGroup, new Runnable() {
084                    public void run() {
085                        runJmsClient(clientName, clientDestIndex, clientDestCount);
086                    }
087                });
088                t.setName(getSysTest().getClientPrefix() + i + " Thread");
089                t.start();
090            }
091    
092            // Run samplers
093            if (getSysTest().getSamplers().indexOf(JmsClientSystemProperties.SAMPLER_TP) > -1) {
094                tpSampler.startSampler();
095            }
096    
097            if (getSysTest().getSamplers().indexOf(JmsClientSystemProperties.SAMPLER_CPU) > -1) {
098                try {
099                    cpuSampler.createPlugin();
100                    cpuSampler.startSampler();
101                } catch (IOException e) {
102                    LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage());
103                }
104            }
105    
106            tpSampler.waitUntilDone();
107            cpuSampler.waitUntilDone();
108    
109            writer.closeReportWriter();
110        }
111    
112        public ThroughputSamplerTask getTpSampler() {
113            return tpSampler;
114        }
115    
116        public void setTpSampler(ThroughputSamplerTask tpSampler) {
117            this.tpSampler = tpSampler;
118        }
119    
120        public CpuSamplerTask getCpuSampler() {
121            return cpuSampler;
122        }
123    
124        public void setCpuSampler(CpuSamplerTask cpuSampler) {
125            this.cpuSampler = cpuSampler;
126        }
127    
128        public JmsFactoryProperties getFactory() {
129            return factory;
130        }
131    
132        public void setFactory(JmsFactoryProperties factory) {
133            this.factory = factory;
134        }
135    
136        public abstract JmsClientSystemProperties getSysTest();
137    
138        public abstract void setSysTest(JmsClientSystemProperties sysTestProps);
139    
140        public abstract JmsClientProperties getJmsClientProperties();
141    
142        protected PerformanceReportWriter createPerfWriter() {
143            if (getSysTest().getReportType().equalsIgnoreCase(JmsClientSystemProperties.REPORT_XML_FILE)) {
144                String reportName;
145    
146                if ((reportName = getSysTest().getReportName()) == null) {
147                    reportName = getSysTest().getClientPrefix() + "_" + "numClients" + getSysTest().getNumClients() + "_" + "numDests" + getSysTest().getTotalDests() + "_" + getSysTest().getDestDistro();
148                }
149                return new XmlFilePerfReportWriter(getSysTest().getReportDir(), reportName);
150            } else if (getSysTest().getReportType().equalsIgnoreCase(JmsClientSystemProperties.REPORT_VERBOSE)) {
151                return new VerbosePerfReportWriter();
152            } else {
153                // Use verbose if unknown report type
154                return new VerbosePerfReportWriter();
155            }
156        }
157    
158        protected void distributeDestinations(String distroType, int clientIndex, int numClients, int numDests) {
159            if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_ALL)) {
160                clientDestCount = numDests;
161                clientDestIndex = 0;
162            } else if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_EQUAL)) {
163                int destPerClient = numDests / numClients;
164                // There are equal or more destinations per client
165                if (destPerClient > 0) {
166                    clientDestCount = destPerClient;
167                    clientDestIndex = destPerClient * clientIndex;
168                    // If there are more clients than destinations, share
169                    // destinations per client
170                } else {
171                    clientDestCount = 1; // At most one destination per client
172                    clientDestIndex = clientIndex % numDests;
173                }
174            } else if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_DIVIDE)) {
175                int destPerClient = numDests / numClients;
176                // There are equal or more destinations per client
177                if (destPerClient > 0) {
178                    int remain = numDests % numClients;
179                    int nextIndex;
180                    if (clientIndex < remain) {
181                        destPerClient++;
182                        nextIndex = clientIndex * destPerClient;
183                    } else {
184                        nextIndex = (clientIndex * destPerClient) + remain;
185                    }
186    
187                    clientDestCount = destPerClient;
188                    clientDestIndex = nextIndex;
189    
190                    // If there are more clients than destinations, share
191                    // destinations per client
192                } else {
193                    clientDestCount = 1; // At most one destination per client
194                    clientDestIndex = clientIndex % numDests;
195                }
196    
197                // Send to all for unknown behavior
198            } else {
199                LOG.warn("Unknown destination distribution type: " + distroType);
200                clientDestCount = numDests;
201                clientDestIndex = 0;
202            }
203        }
204    
205        protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException {
206            try {
207                Class spi = Class.forName(spiClass);
208                SPIConnectionFactory spiFactory = (SPIConnectionFactory)spi.newInstance();
209                ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings);
210                LOG.info("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName());
211                return jmsFactory;
212            } catch (Exception e) {
213                e.printStackTrace();
214                throw new JMSException(e.getMessage());
215            }
216        }
217    
218        protected void setProviderMetaData(ConnectionMetaData metaData, JmsClientProperties props) throws JMSException {
219            props.setJmsProvider(metaData.getJMSProviderName() + "-" + metaData.getProviderVersion());
220            props.setJmsVersion(metaData.getJMSVersion());
221    
222            String jmsProperties = "";
223            Enumeration jmsProps = metaData.getJMSXPropertyNames();
224            while (jmsProps.hasMoreElements()) {
225                jmsProperties += jmsProps.nextElement().toString() + ",";
226            }
227            if (jmsProperties.length() > 0) {
228                // Remove the last comma
229                jmsProperties = jmsProperties.substring(0, jmsProperties.length() - 1);
230            }
231            props.setJmsProperties(jmsProperties);
232        }
233    
234        protected abstract void runJmsClient(String clientName, int clientDestIndex, int clientDestCount);
235    
236        protected static Properties parseStringArgs(String[] args) {
237            File configFile = null;
238            Properties props = new Properties();
239    
240            if (args == null || args.length == 0) {
241                return props; // Empty properties
242            }
243    
244            for (int i = 0; i < args.length; i++) {
245                String arg = args[i];
246                if (arg.startsWith("-D") || arg.startsWith("-d")) {
247                    arg = arg.substring(2);
248                }
249                int index = arg.indexOf("=");
250                String key = arg.substring(0, index);
251                String val = arg.substring(index + 1);
252    
253                if (key.equalsIgnoreCase("sysTest.propsConfigFile")) {
254                    if (!val.endsWith(".properties")) {
255                        val += ".properties";
256                    }
257                    configFile = new File(val);
258                }
259                props.setProperty(key, val);
260            }
261    
262            Properties fileProps = new Properties();
263            try {
264                if (configFile != null) {
265                    LOG.info("Loading properties file: " + configFile.getAbsolutePath());
266                    fileProps.load(new FileInputStream(configFile));
267                }
268            } catch (IOException e) {
269                e.printStackTrace();
270            }
271            // Overwrite file settings with command line settings
272            fileProps.putAll(props);
273            return fileProps;
274        }
275    }