001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import javax.jms.Connection;
020    import javax.jms.ConnectionFactory;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Session;
024    
025    import org.apache.activemq.tool.properties.JmsClientProperties;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    public abstract class AbstractJmsClient {
030    
031        private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class);
032    
033        protected ConnectionFactory factory;
034        protected Connection jmsConnection;
035        protected Session jmsSession;
036    
037        protected int destCount = 1;
038        protected int destIndex;
039        protected String clientName = "";
040        
041        private int internalTxCounter = 0;
042    
043        public AbstractJmsClient(ConnectionFactory factory) {
044            this.factory = factory;
045        }
046    
047        public abstract JmsClientProperties getClient();
048    
049        public abstract void setClient(JmsClientProperties client);
050    
051        public ConnectionFactory getFactory() {
052            return factory;
053        }
054    
055        public void setFactory(ConnectionFactory factory) {
056            this.factory = factory;
057        }
058    
059        public int getDestCount() {
060            return destCount;
061        }
062    
063        public void setDestCount(int destCount) {
064            this.destCount = destCount;
065        }
066    
067        public int getDestIndex() {
068            return destIndex;
069        }
070    
071        public void setDestIndex(int destIndex) {
072            this.destIndex = destIndex;
073        }
074    
075        public String getClientName() {
076            return clientName;
077        }
078    
079        public void setClientName(String clientName) {
080            this.clientName = clientName;
081        }
082    
083        public Connection getConnection() throws JMSException {
084            if (jmsConnection == null) {
085                jmsConnection = factory.createConnection();
086                jmsConnection.setClientID(getClientName());
087                LOG.info("Creating JMS Connection: Provider=" + getClient().getJmsProvider() + ", JMS Spec=" + getClient().getJmsVersion());
088            }
089            return jmsConnection;
090        }
091    
092        public Session getSession() throws JMSException {
093            if (jmsSession == null) {
094                int ackMode;
095                if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_AUTO_ACKNOWLEDGE)) {
096                    ackMode = Session.AUTO_ACKNOWLEDGE;
097                } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_CLIENT_ACKNOWLEDGE)) {
098                    ackMode = Session.CLIENT_ACKNOWLEDGE;
099                } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_DUPS_OK_ACKNOWLEDGE)) {
100                    ackMode = Session.DUPS_OK_ACKNOWLEDGE;
101                } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_TRANSACTED)) {
102                    ackMode = Session.SESSION_TRANSACTED;
103                } else {
104                    ackMode = Session.AUTO_ACKNOWLEDGE;
105                }
106                jmsSession = getConnection().createSession(getClient().isSessTransacted(), ackMode);
107            }
108            return jmsSession;
109        }
110    
111        public Destination[] createDestination(int destIndex, int destCount) throws JMSException {
112    
113            if (getClient().isDestComposite()) {
114                return new Destination[] {
115                    createCompositeDestination(getClient().getDestName(), destIndex, destCount)
116                };
117            } else {
118                Destination[] dest = new Destination[destCount];
119                for (int i = 0; i < destCount; i++) {
120                    dest[i] = createDestination(getClient().getDestName() + "." + (destIndex + i));
121                }
122    
123                return dest;
124            }
125        }
126    
127        public Destination createCompositeDestination(int destIndex, int destCount) throws JMSException {
128            return createCompositeDestination(getClient().getDestName(), destIndex, destCount);
129        }
130    
131        protected Destination createCompositeDestination(String name, int destIndex, int destCount) throws JMSException {
132            String compDestName;
133            String simpleName;
134    
135            if (name.startsWith("queue://")) {
136                simpleName = name.substring("queue://".length());
137            } else if (name.startsWith("topic://")) {
138                simpleName = name.substring("topic://".length());
139            } else {
140                simpleName = name;
141            }
142    
143            int i;
144            compDestName = name + "." + destIndex + ","; // First destination
145            for (i = 1; i < destCount - 1; i++) {
146                compDestName += simpleName + "." + (destIndex + i) + ",";
147            }
148            // Last destination (minus the comma)
149            compDestName += simpleName + "." + (destIndex + i);
150    
151            return createDestination(compDestName);
152        }
153    
154        protected Destination createDestination(String name) throws JMSException {
155            if (name.startsWith("queue://")) {
156                return getSession().createQueue(name.substring("queue://".length()));
157            } else if (name.startsWith("topic://")) {
158                return getSession().createTopic(name.substring("topic://".length()));
159            } else {
160                return getSession().createTopic(name);
161            }
162        }
163    
164        /** 
165         * Helper method that checks if session is 
166         * transacted and whether to commit the tx based on commitAfterXMsgs 
167         * property. 
168         * 
169         * @return true if transaction was committed. 
170         * @throws JMSException in case the call to JMS Session.commit() fails.
171         */
172        public boolean commitTxIfNecessary() throws JMSException {
173            
174            internalTxCounter++;
175            if (getClient().isSessTransacted()) {
176                    if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) {
177                            LOG.debug("Committing transaction.");
178                            internalTxCounter = 0;
179                            getSession().commit();
180                            return true;
181                    }
182            }
183            return false;
184        }
185    }