001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import javax.annotation.PostConstruct;
020 import javax.annotation.PreDestroy;
021 import javax.jms.Connection;
022 import javax.jms.ConnectionFactory;
023 import javax.jms.Destination;
024 import javax.jms.ExceptionListener;
025 import javax.jms.JMSException;
026 import javax.jms.MessageConsumer;
027 import javax.jms.Session;
028 import org.apache.activemq.ActiveMQConnectionFactory;
029 import org.apache.activemq.Service;
030 import org.apache.activemq.advisory.AdvisorySupport;
031 import org.apache.activemq.util.ServiceStopper;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * An agent which listens to commands on a JMS destination
037 *
038 *
039 * @org.apache.xbean.XBean
040 */
041 public class CommandAgent implements Service, ExceptionListener {
042 private static final Logger LOG = LoggerFactory.getLogger(CommandAgent.class);
043
044 private String brokerUrl = "vm://localhost";
045 private String username;
046 private String password;
047 private ConnectionFactory connectionFactory;
048 private Connection connection;
049 private Destination commandDestination;
050 private CommandMessageListener listener;
051 private Session session;
052 private MessageConsumer consumer;
053
054 /**
055 *
056 * @throws Exception
057 * @org.apache.xbean.InitMethod
058 */
059 @PostConstruct
060 public void start() throws Exception {
061 session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
062 listener = new CommandMessageListener(session);
063 Destination destination = getCommandDestination();
064 if (LOG.isDebugEnabled()) {
065 LOG.debug("Agent subscribing to control destination: " + destination);
066 }
067 consumer = session.createConsumer(destination);
068 consumer.setMessageListener(listener);
069 }
070
071 /**
072 *
073 * @throws Exception
074 * @org.apache.xbean.DestroyMethod
075 */
076 @PreDestroy
077 public void stop() throws Exception {
078 if (consumer != null) {
079 try {
080 consumer.close();
081 consumer = null;
082 } catch (JMSException ignored) {
083 }
084 }
085 if (session != null) {
086 try {
087 session.close();
088 session = null;
089 } catch (JMSException ignored) {
090 }
091 }
092 if (connection != null) {
093 try {
094 connection.close();
095 connection = null;
096 } catch (JMSException ignored) {
097 }
098 }
099 }
100
101 // Properties
102 // -------------------------------------------------------------------------
103 public String getBrokerUrl() {
104 return brokerUrl;
105 }
106
107 public void setBrokerUrl(String brokerUrl) {
108 this.brokerUrl = brokerUrl;
109 }
110
111 public String getUsername() {
112 return username;
113 }
114
115 public void setUsername(String username) {
116 this.username = username;
117 }
118
119 public String getPassword() {
120 return password;
121 }
122
123 public void setPassword(String password) {
124 this.password = password;
125 }
126
127 public ConnectionFactory getConnectionFactory() {
128 if (connectionFactory == null) {
129 connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
130 }
131 return connectionFactory;
132 }
133
134 public void setConnectionFactory(ConnectionFactory connectionFactory) {
135 this.connectionFactory = connectionFactory;
136 }
137
138 public Connection getConnection() throws JMSException {
139 if (connection == null) {
140 connection = createConnection();
141 connection.setExceptionListener(this);
142 connection.start();
143 }
144 return connection;
145 }
146
147 public void setConnection(Connection connection) {
148 this.connection = connection;
149 }
150
151 public Destination getCommandDestination() {
152 if (commandDestination == null) {
153 commandDestination = createCommandDestination();
154 }
155 return commandDestination;
156 }
157
158 public void setCommandDestination(Destination commandDestination) {
159 this.commandDestination = commandDestination;
160 }
161
162 protected Connection createConnection() throws JMSException {
163 return getConnectionFactory().createConnection(username, password);
164 }
165
166 protected Destination createCommandDestination() {
167 return AdvisorySupport.getAgentDestination();
168 }
169
170 public void onException(JMSException exception) {
171 try {
172 stop();
173 } catch (Exception e) {
174 }
175 }
176 }