001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.web;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import javax.management.MBeanServerConnection;
029    import javax.management.MBeanServerInvocationHandler;
030    import javax.management.MalformedObjectNameException;
031    import javax.management.ObjectName;
032    import javax.management.QueryExp;
033    import javax.management.remote.JMXConnector;
034    import javax.management.remote.JMXConnectorFactory;
035    import javax.management.remote.JMXServiceURL;
036    
037    import org.apache.activemq.broker.jmx.BrokerViewMBean;
038    import org.apache.activemq.broker.jmx.ManagementContext;
039    import org.apache.activemq.broker.jmx.QueueViewMBean;
040    import org.apache.activemq.command.ActiveMQDestination;
041    import org.apache.activemq.web.config.WebConsoleConfiguration;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * A {@link BrokerFacade} which uses a JMX-Connection to communicate with a
047     * broker
048     * 
049     * 
050     */
051    public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
052        
053        private static final transient Logger LOG = LoggerFactory.getLogger(RemoteJMXBrokerFacade.class);
054        
055        private String brokerName;
056        private JMXConnector connector;
057        private WebConsoleConfiguration configuration;
058    
059        public void setBrokerName(String brokerName) {
060            this.brokerName = brokerName;
061        }
062    
063        public WebConsoleConfiguration getConfiguration() {
064                    return configuration;
065            }
066    
067            public void setConfiguration(WebConsoleConfiguration configuration) {
068                    this.configuration = configuration;
069            }
070    
071            /**
072         * Shutdown this facade aka close any open connection.
073         */
074        public void shutdown() {
075            closeConnection();
076        }
077    
078        private ObjectName getBrokerObjectName(MBeanServerConnection connection)
079                            throws IOException, MalformedObjectNameException {
080                    Set<ObjectName> brokers = findBrokers(connection);
081                    if (brokers.size() == 0) {
082                            throw new IOException("No broker could be found in the JMX.");
083                    }
084                    ObjectName name = brokers.iterator().next();
085                    return name;
086            }
087    
088        public BrokerViewMBean getBrokerAdmin() throws Exception {
089            MBeanServerConnection connection = getMBeanServerConnection();
090    
091            Set brokers = findBrokers(connection);
092            if (brokers.size() == 0) {
093                throw new IOException("No broker could be found in the JMX.");
094            }
095            ObjectName name = (ObjectName)brokers.iterator().next();
096            BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);
097            return mbean;
098        }
099    
100        public String getBrokerName() throws Exception,
101                            MalformedObjectNameException {
102            return getBrokerAdmin().getBrokerName();
103        }
104        
105        protected MBeanServerConnection getMBeanServerConnection() throws Exception {
106            JMXConnector connector = this.connector;
107            if (isConnectionActive(connector)) {
108                return connector.getMBeanServerConnection();
109            }
110    
111            synchronized (this) {
112                closeConnection();
113    
114                LOG.debug("Creating a new JMX-Connection to the broker");
115                this.connector = createConnection();
116                return this.connector.getMBeanServerConnection();
117            }
118        }
119    
120        protected boolean isConnectionActive(JMXConnector connector) {
121            if (connector == null) {
122                return false;
123            }
124    
125            try {
126                MBeanServerConnection connection = connector.getMBeanServerConnection();
127                int brokerCount = findBrokers(connection).size();
128                return brokerCount > 0;
129            } catch (Exception e) {
130                return false;
131            }
132        }
133    
134        protected JMXConnector createConnection() {
135    
136            Map<String, Object> env = new HashMap<String, Object>();
137                    if (this.configuration.getJmxUser() != null) {
138                            env.put("jmx.remote.credentials", new String[] {
139                                            this.configuration.getJmxUser(),
140                                            this.configuration.getJmxPassword() });
141                    }
142            Collection<JMXServiceURL> jmxUrls = this.configuration.getJmxUrls();
143    
144            Exception exception = null;
145                    for (JMXServiceURL url : jmxUrls) {
146                            try {
147                                    JMXConnector connector = JMXConnectorFactory.connect(url, env);
148                                    connector.connect();
149                                    MBeanServerConnection connection = connector
150                                                    .getMBeanServerConnection();
151    
152                                    Set<ObjectName> brokers = findBrokers(connection);
153                                    if (brokers.size() > 0) {
154                                            LOG.info("Connected via JMX to the broker at " + url);
155                                            return connector;
156                                    }
157                            } catch (Exception e) {
158                                    // Keep the exception for later
159                                    exception = e;
160                            }
161                    }
162                    if (exception != null) {
163                            if (exception instanceof RuntimeException) {
164                                    throw (RuntimeException) exception;
165                            } else {
166                                    throw new RuntimeException(exception);
167                            }
168                    }
169                    throw new IllegalStateException("No broker is found at any of the "
170                                    + jmxUrls.size() + " configured urls");
171            }
172    
173        protected synchronized void closeConnection() {
174            if (connector != null) {
175                try {
176                    LOG.debug("Closing a connection to a broker (" + connector.getConnectionId() + ")");
177    
178                    connector.close();
179                } catch (IOException e) {
180                    // Ignore the exception, since it most likly won't matter
181                    // anymore
182                }
183            }
184        }
185    
186            /**
187             * Finds all ActiveMQ-Brokers registered on a certain JMX-Server or, if a
188             * JMX-BrokerName has been set, the broker with that name.
189             * 
190             * @param connection
191             *            not <code>null</code>
192             * @return Set with ObjectName-elements
193             * @throws IOException
194             * @throws MalformedObjectNameException
195             */
196            @SuppressWarnings("unchecked")
197            protected Set<ObjectName> findBrokers(MBeanServerConnection connection)
198                            throws IOException, MalformedObjectNameException {
199                    ObjectName name;
200                    if (this.brokerName == null) {
201                            name = new ObjectName("org.apache.activemq:type=Broker,brokerName=*");
202                    } else {
203                            name = new ObjectName("org.apache.activemq:brokerName="
204                                            + this.brokerName + ",Type=broker");
205                    }
206    
207                    Set<ObjectName> brokers = connection.queryNames(name, null);
208                    Set<ObjectName> masterBrokers = new HashSet<ObjectName>();
209                    for (ObjectName objectName : brokers) {
210                            BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, objectName, BrokerViewMBean.class, true);
211                            if (!mbean.isSlave()) masterBrokers.add(objectName);
212                    }
213                    return masterBrokers;
214            }
215            
216            public void purgeQueue(ActiveMQDestination destination) throws Exception {
217                    QueueViewMBean queue = getQueue(destination.getPhysicalName());
218                    queue.purge();
219            }
220            
221            public ManagementContext getManagementContext() {
222                    throw new IllegalStateException("not supported");
223            }
224    
225            
226            @SuppressWarnings("unchecked")
227            protected <T> Collection<T> getManagedObjects(ObjectName[] names,
228                            Class<T> type) {
229                    MBeanServerConnection connection;
230                    try {
231                            connection = getMBeanServerConnection();
232                    } catch (Exception e) {
233                            throw new RuntimeException(e);
234                    }
235    
236                    List<T> answer = new ArrayList<T>();
237                    if (connection != null) {
238                            for (int i = 0; i < names.length; i++) {
239                                    ObjectName name = names[i];
240                                    T value = (T) MBeanServerInvocationHandler.newProxyInstance(
241                                                    connection, name, type, true);
242                                    if (value != null) {
243                                            answer.add(value);
244                                    }
245                            }
246                    }
247                    return answer;
248        }
249    
250        @Override
251        public Set queryNames(ObjectName name, QueryExp query) throws Exception {
252            return getMBeanServerConnection().queryNames(name, query);
253        }
254    
255        @Override
256        public Object newProxyInstance(ObjectName objectName, Class interfaceClass,boolean notificationBroadcaster) throws Exception {
257            return MBeanServerInvocationHandler.newProxyInstance(getMBeanServerConnection(), objectName, interfaceClass, notificationBroadcaster);
258        }
259    
260    }