001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import java.io.IOException;
020    
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageListener;
025    import javax.jms.MessageProducer;
026    import javax.jms.Session;
027    import javax.jms.TextMessage;
028    
029    import org.apache.activemq.command.ActiveMQTextMessage;
030    import org.apache.activemq.util.FactoryFinder;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * 
036     */
037    public class CommandMessageListener implements MessageListener {
038        private static final Logger LOG = LoggerFactory.getLogger(CommandMessageListener.class);
039    
040        private Session session;
041        private MessageProducer producer;
042        private CommandHandler handler;
043    
044        public CommandMessageListener(Session session) {
045            this.session = session;
046        }
047    
048        public void onMessage(Message message) {
049            if (LOG.isDebugEnabled()) {
050                LOG.debug("Received command: " + message);
051            }
052            if (message instanceof TextMessage) {
053                TextMessage request = (TextMessage)message;
054                try {
055                    Destination replyTo = message.getJMSReplyTo();
056                    if (replyTo == null) {
057                        LOG.warn("Ignored message as no JMSReplyTo set: " + message);
058                        return;
059                    }
060                    Message response = processCommand(request);
061                    addReplyHeaders(request, response);
062                    getProducer().send(replyTo, response);
063                } catch (Exception e) {
064                    LOG.error("Failed to process message due to: " + e + ". Message: " + message, e);
065                }
066            } else {
067                LOG.warn("Ignoring invalid message: " + message);
068            }
069        }
070    
071        protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
072            String correlationID = request.getJMSCorrelationID();
073            if (correlationID != null) {
074                response.setJMSCorrelationID(correlationID);
075            }
076        }
077    
078        /**
079         * Processes an incoming JMS message returning the response message
080         */
081        public Message processCommand(TextMessage request) throws Exception {
082            TextMessage response = session.createTextMessage();
083            getHandler().processCommand(request, response);
084            return response;
085        }
086    
087        /**
088         * Processes an incoming command from a console and returning the text to
089         * output
090         */
091        public String processCommandText(String line) throws Exception {
092            TextMessage request = new ActiveMQTextMessage();
093            request.setText(line);
094            TextMessage response = new ActiveMQTextMessage();
095            getHandler().processCommand(request, response);
096            return response.getText();
097        }
098    
099        public Session getSession() {
100            return session;
101        }
102    
103        public MessageProducer getProducer() throws JMSException {
104            if (producer == null) {
105                producer = getSession().createProducer(null);
106            }
107            return producer;
108        }
109    
110        public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
111            if (handler == null) {
112                handler = createHandler();
113            }
114            return handler;
115        }
116    
117        private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
118            FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
119            return (CommandHandler)factoryFinder.newInstance("agent");
120        }
121    }