001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.management.ObjectName;
026
027 import org.apache.activemq.broker.BrokerService;
028 import org.apache.activemq.broker.SslContext;
029 import org.apache.activemq.command.DiscoveryEvent;
030 import org.apache.activemq.transport.Transport;
031 import org.apache.activemq.transport.TransportFactory;
032 import org.apache.activemq.transport.discovery.DiscoveryAgent;
033 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
034 import org.apache.activemq.transport.discovery.DiscoveryListener;
035 import org.apache.activemq.util.IntrospectionSupport;
036 import org.apache.activemq.util.ServiceStopper;
037 import org.apache.activemq.util.ServiceSupport;
038 import org.apache.activemq.util.URISupport;
039 import org.slf4j.Logger;
040 import org.slf4j.LoggerFactory;
041
042 /**
043 * A network connector which uses a discovery agent to detect the remote brokers
044 * available and setup a connection to each available remote broker
045 *
046 * @org.apache.xbean.XBean element="networkConnector"
047 *
048 */
049 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
050 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class);
051
052 private DiscoveryAgent discoveryAgent;
053 private Map<String, String> parameters;
054
055 public DiscoveryNetworkConnector() {
056 }
057
058 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException {
059 setUri(discoveryURI);
060 }
061
062 public void setUri(URI discoveryURI) throws IOException {
063 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
064 try {
065 parameters = URISupport.parseParameters(discoveryURI);
066 // allow discovery agent to grab it's parameters
067 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
068 } catch (URISyntaxException e) {
069 LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
070 }
071 }
072
073 public void onServiceAdd(DiscoveryEvent event) {
074 // Ignore events once we start stopping.
075 if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
076 return;
077 }
078 String url = event.getServiceName();
079 if (url != null) {
080 URI uri;
081 try {
082 uri = new URI(url);
083 } catch (URISyntaxException e) {
084 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
085 return;
086 }
087
088 // Should we try to connect to that URI?
089 synchronized (bridges) {
090 if( bridges.containsKey(uri) ) {
091 if (LOG.isDebugEnabled()) {
092 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
093 }
094 return;
095 }
096 }
097 if (localURI.equals(uri)) {
098 if (LOG.isDebugEnabled()) {
099 LOG.debug("not connecting loopback: " + uri);
100 }
101 return;
102 }
103
104 if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("connectionFilter disallows connection to: " + uri);
107 }
108 return;
109 }
110
111 URI connectUri = uri;
112 try {
113 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
114 } catch (URISyntaxException e) {
115 LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
116 }
117
118 if (LOG.isInfoEnabled()) {
119 LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
120 }
121
122 Transport remoteTransport;
123 Transport localTransport;
124 try {
125 // Allows the transport to access the broker's ssl configuration.
126 SslContext.setCurrentSslContext(getBrokerService().getSslContext());
127 try {
128 remoteTransport = TransportFactory.connect(connectUri);
129 } catch (Exception e) {
130 LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("Connection failure exception: " + e, e);
133 }
134 return;
135 }
136 try {
137 localTransport = createLocalTransport();
138 } catch (Exception e) {
139 ServiceSupport.dispose(remoteTransport);
140 LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
141 if (LOG.isDebugEnabled()) {
142 LOG.debug("Connection failure exception: " + e, e);
143 }
144 return;
145 }
146 } finally {
147 SslContext.setCurrentSslContext(null);
148 }
149 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
150 try {
151 bridge.start();
152 synchronized (bridges) {
153 bridges.put(uri, bridge);
154 }
155 } catch (Exception e) {
156 ServiceSupport.dispose(localTransport);
157 ServiceSupport.dispose(remoteTransport);
158 LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("Start failure exception: " + e, e);
161 }
162 try {
163 discoveryAgent.serviceFailed(event);
164 } catch (IOException e1) {
165 if (LOG.isDebugEnabled()) {
166 LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
167 }
168 }
169 }
170 }
171 }
172
173 public void onServiceRemove(DiscoveryEvent event) {
174 String url = event.getServiceName();
175 if (url != null) {
176 URI uri;
177 try {
178 uri = new URI(url);
179 } catch (URISyntaxException e) {
180 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
181 return;
182 }
183
184 synchronized (bridges) {
185 bridges.remove(uri);
186 }
187 }
188 }
189
190 public DiscoveryAgent getDiscoveryAgent() {
191 return discoveryAgent;
192 }
193
194 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
195 this.discoveryAgent = discoveryAgent;
196 if (discoveryAgent != null) {
197 this.discoveryAgent.setDiscoveryListener(this);
198 }
199 }
200
201 protected void handleStart() throws Exception {
202 if (discoveryAgent == null) {
203 throw new IllegalStateException("You must configure the 'discoveryAgent' property");
204 }
205 this.discoveryAgent.start();
206 super.handleStart();
207 }
208
209 protected void handleStop(ServiceStopper stopper) throws Exception {
210 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) {
211 NetworkBridge bridge = i.next();
212 try {
213 bridge.stop();
214 } catch (Exception e) {
215 stopper.onException(this, e);
216 }
217 }
218 bridges.clear();
219 try {
220 this.discoveryAgent.stop();
221 } catch (Exception e) {
222 stopper.onException(this, e);
223 }
224
225 super.handleStop(stopper);
226 }
227
228 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
229 class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
230
231 public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) {
232 super(brokerService, connectorName);
233 }
234
235 public void bridgeFailed() {
236 if (!serviceSupport.isStopped()) {
237 try {
238 discoveryAgent.serviceFailed(event);
239 } catch (IOException e) {
240 }
241 }
242
243 }
244 }
245 NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
246
247 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
248 result.setBrokerService(getBrokerService());
249 return configureBridge(result);
250 }
251
252 @Override
253 public String toString() {
254 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
255 }
256 }