001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.tool;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import javax.jms.*;
023
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.tool.properties.JmsClientProperties;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029public abstract class AbstractJmsClient {
030
031    private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class);
032
033    private static final String QUEUE_SCHEME = "queue://";
034    private static final String TOPIC_SCHEME = "topic://";
035    private static final String TEMP_QUEUE_SCHEME = "temp-queue://";
036    private static final String TEMP_TOPIC_SCHEME = "temp-topic://";
037    public static final String DESTINATION_SEPARATOR = ",";
038
039    protected ConnectionFactory factory;
040    protected Connection jmsConnection;
041    protected Session jmsSession;
042
043    protected int destCount = 1;
044    protected int destIndex;
045    protected String clientName = "";
046
047    private int internalTxCounter = 0;
048
049    public AbstractJmsClient(ConnectionFactory factory) {
050        this.factory = factory;
051    }
052
053    public abstract JmsClientProperties getClient();
054
055    public abstract void setClient(JmsClientProperties client);
056
057    public ConnectionFactory getFactory() {
058        return factory;
059    }
060
061    public void setFactory(ConnectionFactory factory) {
062        this.factory = factory;
063    }
064
065    public int getDestCount() {
066        return destCount;
067    }
068
069    public void setDestCount(int destCount) {
070        this.destCount = destCount;
071    }
072
073    public int getDestIndex() {
074        return destIndex;
075    }
076
077    public void setDestIndex(int destIndex) {
078        this.destIndex = destIndex;
079    }
080
081    public String getClientName() {
082        return clientName;
083    }
084
085    public void setClientName(String clientName) {
086        this.clientName = clientName;
087    }
088
089    public Connection getConnection() throws JMSException {
090        if (jmsConnection == null) {
091            jmsConnection = factory.createConnection();
092            jmsConnection.setClientID(getClientName());
093            LOG.info("Creating JMS Connection: Provider=" + getClient().getJmsProvider() + ", JMS Spec=" + getClient().getJmsVersion());
094        }
095        return jmsConnection;
096    }
097
098    public Session getSession() throws JMSException {
099        if (jmsSession == null) {
100            int ackMode;
101            if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_AUTO_ACKNOWLEDGE)) {
102                ackMode = Session.AUTO_ACKNOWLEDGE;
103            } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_CLIENT_ACKNOWLEDGE)) {
104                ackMode = Session.CLIENT_ACKNOWLEDGE;
105            } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_DUPS_OK_ACKNOWLEDGE)) {
106                ackMode = Session.DUPS_OK_ACKNOWLEDGE;
107            } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_TRANSACTED)) {
108                ackMode = Session.SESSION_TRANSACTED;
109            } else {
110                ackMode = Session.AUTO_ACKNOWLEDGE;
111            }
112            jmsSession = getConnection().createSession(getClient().isSessTransacted(), ackMode);
113        }
114        return jmsSession;
115    }
116
117    public Destination[] createDestinations(int destCount) throws JMSException {
118        final String destName = getClient().getDestName();
119        ArrayList<Destination> destinations = new ArrayList<>();
120        if (destName.contains(DESTINATION_SEPARATOR)) {
121            if (getClient().isDestComposite() && (destCount == 1)) {
122                // user was explicit about which destinations to make composite
123                String[] simpleNames = mapToSimpleNames(destName.split(DESTINATION_SEPARATOR));
124                String joinedSimpleNames = join(simpleNames, DESTINATION_SEPARATOR);
125
126                // use the type of the 1st destination for the Destination instance
127                byte destinationType = getDestinationType(destName);
128                destinations.add(createCompositeDestination(destinationType, joinedSimpleNames, 1));
129            } else {
130                LOG.info("User requested multiple destinations, splitting: {}", destName);
131                // either composite with multiple destinations to be suffixed
132                // or multiple non-composite destinations
133                String[] destinationNames = destName.split(DESTINATION_SEPARATOR);
134                for (String splitDestName : destinationNames) {
135                    addDestinations(destinations, splitDestName, destCount);
136                }
137            }
138        } else {
139            addDestinations(destinations, destName, destCount);
140        }
141        return destinations.toArray(new Destination[] {});
142    }
143
144    private String join(String[] stings, String separator) {
145        StringBuffer sb = new StringBuffer();
146        for (int i = 0; i < stings.length; i++) {
147            if (i > 0) {
148                sb.append(separator);
149            }
150            sb.append(stings[i]);
151        }
152        return sb.toString();
153    }
154
155    private void addDestinations(List<Destination> destinations, String destName, int destCount) throws JMSException {
156        boolean destComposite = getClient().isDestComposite();
157        if ((destComposite) && (destCount > 1)) {
158            destinations.add(createCompositeDestination(destName, destCount));
159        } else {
160            for (int i = 0; i < destCount; i++) {
161                destinations.add(createDestination(withDestinationSuffix(destName, i, destCount)));
162            }
163        }
164    }
165
166    private String withDestinationSuffix(String name, int destIndex, int destCount) {
167        return (destCount == 1) ? name : name + "." + destIndex;
168    }
169
170    protected Destination createCompositeDestination(String destName, int destCount) throws JMSException {
171        return createCompositeDestination(getDestinationType(destName), destName, destCount);
172    }
173
174    protected Destination createCompositeDestination(byte destinationType, String destName, int destCount) throws JMSException {
175        String simpleName = getSimpleName(destName);
176
177        String compDestName = "";
178        for (int i = 0; i < destCount; i++) {
179            if (i > 0) {
180                compDestName += ",";
181            }
182            compDestName += withDestinationSuffix(simpleName, i, destCount);
183        }
184
185        LOG.info("Creating composite destination: {}", compDestName);
186        Destination destination;
187        Session session = getSession();
188        if (destinationType == ActiveMQDestination.TOPIC_TYPE) {
189            destination = session.createTopic(compDestName);
190        } else if (destinationType == ActiveMQDestination.QUEUE_TYPE) {
191            destination = session.createQueue(compDestName);
192        } else {
193            throw new UnsupportedOperationException(
194                    "Cannot create composite destinations using temporary queues or topics.");
195        }
196        assert (destination != null);
197        return destination;
198    }
199
200    private String[] mapToSimpleNames(String[] destNames) {
201        assert (destNames != null);
202        String[] simpleNames = new String[destNames.length];
203        for (int i = 0; i < destNames.length; i++) {
204            simpleNames[i] = getSimpleName(destNames[i]);
205        }
206        return simpleNames;
207    }
208
209    protected String getSimpleName(String destName) {
210        String simpleName;
211        if (destName.startsWith(QUEUE_SCHEME)) {
212            simpleName = destName.substring(QUEUE_SCHEME.length());
213        } else if (destName.startsWith(TOPIC_SCHEME)) {
214            simpleName = destName.substring(TOPIC_SCHEME.length());
215        } else if (destName.startsWith(TEMP_QUEUE_SCHEME)) {
216            simpleName = destName.substring(TEMP_QUEUE_SCHEME.length());
217        } else if (destName.startsWith(TEMP_TOPIC_SCHEME)) {
218            simpleName = destName.substring(TEMP_TOPIC_SCHEME.length());
219        } else {
220            simpleName = destName;
221        }
222        return simpleName;
223    }
224
225    protected byte getDestinationType(String destName) {
226        assert (destName != null);
227        if (destName.startsWith(QUEUE_SCHEME)) {
228            return ActiveMQDestination.QUEUE_TYPE;
229        } else if (destName.startsWith(TEMP_QUEUE_SCHEME)) {
230            return ActiveMQDestination.TEMP_QUEUE_TYPE;
231        } else if (destName.startsWith(TEMP_TOPIC_SCHEME)) {
232            return ActiveMQDestination.TEMP_TOPIC_TYPE;
233        } else {
234            return ActiveMQDestination.TOPIC_TYPE;
235        }
236    }
237
238    protected Destination createDestination(String destName) throws JMSException {
239        String simpleName = getSimpleName(destName);
240        byte destinationType = getDestinationType(destName);
241
242        if (destinationType == ActiveMQDestination.QUEUE_TYPE) {
243            LOG.info("Creating queue: {}", destName);
244            return getSession().createQueue(simpleName);
245        } else if (destinationType == ActiveMQDestination.TOPIC_TYPE) {
246            LOG.info("Creating topic: {}", destName);
247            return getSession().createTopic(simpleName);
248        } else {
249            return createTemporaryDestination(destName);
250        }
251    }
252
253    protected Destination createTemporaryDestination(String destName) throws JMSException {
254        byte destinationType = getDestinationType(destName);
255
256        if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) {
257            LOG.warn("Creating temporary queue. Requested name ({}) ignored.", destName);
258            TemporaryQueue temporaryQueue = getSession().createTemporaryQueue();
259            LOG.info("Temporary queue created: {}", temporaryQueue.getQueueName());
260            return temporaryQueue;
261        } else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) {
262            LOG.warn("Creating temporary topic. Requested name ({}) ignored.", destName);
263            TemporaryTopic temporaryTopic = getSession().createTemporaryTopic();
264            LOG.info("Temporary topic created: {}", temporaryTopic.getTopicName());
265            return temporaryTopic;
266        } else {
267            throw new IllegalArgumentException("Unrecognized destination type: " + destinationType);
268        }
269    }
270
271    /**
272     * Helper method that checks if session is
273     * transacted and whether to commit the tx based on commitAfterXMsgs
274     * property.
275     *
276     * @return true if transaction was committed.
277     * @throws JMSException in case the call to JMS Session.commit() fails.
278     */
279    public boolean commitTxIfNecessary() throws JMSException {
280        internalTxCounter++;
281        if (getClient().isSessTransacted()) {
282            if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) {
283                LOG.debug("Committing transaction.");
284                internalTxCounter = 0;
285                getSession().commit();
286                return true;
287            }
288        }
289        return false;
290    }
291}