001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import javax.annotation.PostConstruct;
020    import javax.annotation.PreDestroy;
021    import javax.jms.Connection;
022    import javax.jms.ConnectionFactory;
023    import javax.jms.Destination;
024    import javax.jms.ExceptionListener;
025    import javax.jms.JMSException;
026    import javax.jms.MessageConsumer;
027    import javax.jms.Session;
028    import org.apache.activemq.ActiveMQConnectionFactory;
029    import org.apache.activemq.Service;
030    import org.apache.activemq.advisory.AdvisorySupport;
031    import org.apache.activemq.util.ServiceStopper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * An agent which listens to commands on a JMS destination
037     * 
038     * 
039     * @org.apache.xbean.XBean
040     */
041    public class CommandAgent implements Service, ExceptionListener {
042        private static final Logger LOG = LoggerFactory.getLogger(CommandAgent.class);
043    
044        private String brokerUrl = "vm://localhost";
045        private String username;
046        private String password;
047        private ConnectionFactory connectionFactory;
048        private Connection connection;
049        private Destination commandDestination;
050        private CommandMessageListener listener;
051        private Session session;
052        private MessageConsumer consumer;
053    
054        /**
055         *
056         * @throws Exception
057         * @org.apache.xbean.InitMethod
058         */
059        @PostConstruct
060        public void start() throws Exception {
061            session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
062            listener = new CommandMessageListener(session);
063            Destination destination = getCommandDestination();
064            if (LOG.isDebugEnabled()) {
065                LOG.debug("Agent subscribing to control destination: " + destination);
066            }
067            consumer = session.createConsumer(destination);
068            consumer.setMessageListener(listener);
069        }
070    
071        /**
072         *
073         * @throws Exception
074         * @org.apache.xbean.DestroyMethod
075         */
076        @PreDestroy
077        public void stop() throws Exception {
078            if (consumer != null) {
079                try {
080                    consumer.close();
081                    consumer = null;
082                } catch (JMSException ignored) {
083                }
084            }
085            if (session != null) {
086                try {
087                    session.close();
088                    session = null;
089                } catch (JMSException ignored) {
090                }
091            }
092            if (connection != null) {
093                try {
094                    connection.close();
095                    connection = null;
096                } catch (JMSException ignored) {
097                }
098            }
099        }
100    
101        // Properties
102        // -------------------------------------------------------------------------
103        public String getBrokerUrl() {
104            return brokerUrl;
105        }
106    
107        public void setBrokerUrl(String brokerUrl) {
108            this.brokerUrl = brokerUrl;
109        }    
110    
111        public String getUsername() {
112                    return username;
113            }
114    
115            public void setUsername(String username) {
116                    this.username = username;
117            }
118    
119            public String getPassword() {
120                    return password;
121            }
122    
123            public void setPassword(String password) {
124                    this.password = password;
125            }
126    
127            public ConnectionFactory getConnectionFactory() {
128            if (connectionFactory == null) {
129                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
130            }
131            return connectionFactory;
132        }
133    
134        public void setConnectionFactory(ConnectionFactory connectionFactory) {
135            this.connectionFactory = connectionFactory;
136        }
137    
138        public Connection getConnection() throws JMSException {
139            if (connection == null) {
140                connection = createConnection();
141                connection.setExceptionListener(this);
142                connection.start();
143            }
144            return connection;
145        }
146    
147        public void setConnection(Connection connection) {
148            this.connection = connection;
149        }
150    
151        public Destination getCommandDestination() {
152            if (commandDestination == null) {
153                commandDestination = createCommandDestination();
154            }
155            return commandDestination;
156        }
157    
158        public void setCommandDestination(Destination commandDestination) {
159            this.commandDestination = commandDestination;
160        }
161    
162        protected Connection createConnection() throws JMSException {
163            return getConnectionFactory().createConnection(username, password);
164        }
165    
166        protected Destination createCommandDestination() {
167            return AdvisorySupport.getAgentDestination();
168        }
169    
170        public void onException(JMSException exception) {
171            try {
172                stop();
173            } catch (Exception e) {
174            }
175        }
176    }