001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import javax.annotation.PostConstruct;
020    import javax.annotation.PreDestroy;
021    import javax.jms.Connection;
022    import javax.jms.ConnectionFactory;
023    import javax.jms.Destination;
024    import javax.jms.ExceptionListener;
025    import javax.jms.JMSException;
026    import javax.jms.MessageConsumer;
027    import javax.jms.Session;
028    import org.apache.activemq.ActiveMQConnectionFactory;
029    import org.apache.activemq.Service;
030    import org.apache.activemq.advisory.AdvisorySupport;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * An agent which listens to commands on a JMS destination
036     * 
037     * 
038     * @org.apache.xbean.XBean
039     */
040    public class CommandAgent implements Service, ExceptionListener {
041        private static final Logger LOG = LoggerFactory.getLogger(CommandAgent.class);
042    
043        private String brokerUrl = "vm://localhost";
044        private String username;
045        private String password;
046        private ConnectionFactory connectionFactory;
047        private Connection connection;
048        private Destination commandDestination;
049        private CommandMessageListener listener;
050        private Session session;
051        private MessageConsumer consumer;
052    
053        /**
054         *
055         * @throws Exception
056         * @org.apache.xbean.InitMethod
057         */
058        @PostConstruct
059        public void start() throws Exception {
060            session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
061            listener = new CommandMessageListener(session);
062            Destination destination = getCommandDestination();
063            if (LOG.isDebugEnabled()) {
064                LOG.debug("Agent subscribing to control destination: " + destination);
065            }
066            consumer = session.createConsumer(destination);
067            consumer.setMessageListener(listener);
068        }
069    
070        /**
071         *
072         * @throws Exception
073         * @org.apache.xbean.DestroyMethod
074         */
075        @PreDestroy
076        public void stop() throws Exception {
077            if (consumer != null) {
078                try {
079                    consumer.close();
080                    consumer = null;
081                } catch (JMSException ignored) {
082                }
083            }
084            if (session != null) {
085                try {
086                    session.close();
087                    session = null;
088                } catch (JMSException ignored) {
089                }
090            }
091            if (connection != null) {
092                try {
093                    connection.close();
094                    connection = null;
095                } catch (JMSException ignored) {
096                }
097            }
098        }
099    
100        // Properties
101        // -------------------------------------------------------------------------
102        public String getBrokerUrl() {
103            return brokerUrl;
104        }
105    
106        public void setBrokerUrl(String brokerUrl) {
107            this.brokerUrl = brokerUrl;
108        }    
109    
110        public String getUsername() {
111                    return username;
112            }
113    
114            public void setUsername(String username) {
115                    this.username = username;
116            }
117    
118            public String getPassword() {
119                    return password;
120            }
121    
122            public void setPassword(String password) {
123                    this.password = password;
124            }
125    
126            public ConnectionFactory getConnectionFactory() {
127            if (connectionFactory == null) {
128                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
129            }
130            return connectionFactory;
131        }
132    
133        public void setConnectionFactory(ConnectionFactory connectionFactory) {
134            this.connectionFactory = connectionFactory;
135        }
136    
137        public Connection getConnection() throws JMSException {
138            if (connection == null) {
139                connection = createConnection();
140                connection.setExceptionListener(this);
141                connection.start();
142            }
143            return connection;
144        }
145    
146        public void setConnection(Connection connection) {
147            this.connection = connection;
148        }
149    
150        public Destination getCommandDestination() {
151            if (commandDestination == null) {
152                commandDestination = createCommandDestination();
153            }
154            return commandDestination;
155        }
156    
157        public void setCommandDestination(Destination commandDestination) {
158            this.commandDestination = commandDestination;
159        }
160    
161        protected Connection createConnection() throws JMSException {
162            return getConnectionFactory().createConnection(username, password);
163        }
164    
165        protected Destination createCommandDestination() {
166            return AdvisorySupport.getAgentDestination();
167        }
168    
169        public void onException(JMSException exception) {
170            try {
171                stop();
172            } catch (Exception e) {
173            }
174        }
175    }