001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.net.URI;
020 import java.util.Hashtable;
021 import java.util.Map;
022 import java.util.concurrent.ConcurrentHashMap;
023
024 import javax.naming.CommunicationException;
025 import javax.naming.Context;
026 import javax.naming.NamingEnumeration;
027 import javax.naming.directory.Attributes;
028 import javax.naming.directory.DirContext;
029 import javax.naming.directory.InitialDirContext;
030 import javax.naming.directory.SearchControls;
031 import javax.naming.directory.SearchResult;
032 import javax.naming.event.EventDirContext;
033 import javax.naming.event.NamespaceChangeListener;
034 import javax.naming.event.NamingEvent;
035 import javax.naming.event.NamingExceptionEvent;
036 import javax.naming.event.ObjectChangeListener;
037
038 import org.apache.activemq.util.URISupport;
039 import org.apache.activemq.util.URISupport.CompositeData;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 /**
044 * class to create dynamic network connectors listed in an directory server
045 * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the
046 * directory server must implement the ipHost and ipService objectClasses as
047 * defined in RFC 2307.
048 *
049 * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
050 * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
051 *
052 * @org.apache.xbean.XBean element="ldapNetworkConnector"
053 */
054 public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener {
055 private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
056
057 // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
058 private static final String REQUIRED_OBJECT_CLASS_FILTER =
059 "(&(objectClass=ipHost)(objectClass=ipService))";
060
061 // connection
062 private URI[] availableURIs = null;
063 private int availableURIsIndex = 0;
064 private String base = null;
065 private boolean failover = false;
066 private long curReconnectDelay = 1000; /* 1 sec */
067 private long maxReconnectDelay = 30000; /* 30 sec */
068
069 // authentication
070 private String user = null;
071 private String password = null;
072 private boolean anonymousAuthentication = false;
073
074 // search
075 private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
076 private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
077 private boolean searchEventListener = false;
078
079 // connector management
080 private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>();
081 private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>();
082 private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>();
083
084 // local context
085 private DirContext context = null;
086 // currently in use URI
087 private URI ldapURI = null;
088
089 /**
090 * returns the next URI from the configured list
091 *
092 * @return random URI from the configured list
093 */
094 public URI getUri() {
095 return availableURIs[++availableURIsIndex % availableURIs.length];
096 }
097
098 /**
099 * sets the LDAP server URI
100 *
101 * @param _uri
102 * LDAP server URI
103 */
104 public void setUri(URI uri) throws Exception {
105 CompositeData data = URISupport.parseComposite(uri);
106 if (data.getScheme().equals("failover")) {
107 availableURIs = data.getComponents();
108 failover = true;
109 } else {
110 availableURIs = new URI[] { uri };
111 }
112 }
113
114 /**
115 * sets the base LDAP dn used for lookup operations
116 *
117 * @param _base
118 * LDAP base dn
119 */
120 public void setBase(String base) {
121 this.base = base;
122 }
123
124 /**
125 * sets the LDAP user for access credentials
126 *
127 * @param _user
128 * LDAP dn of user
129 */
130 public void setUser(String user) {
131 this.user = user;
132 }
133
134 /**
135 * sets the LDAP password for access credentials
136 *
137 * @param _password
138 * user password
139 */
140 public void setPassword(String password) {
141 this.password = password;
142 }
143
144 /**
145 * sets LDAP anonymous authentication access credentials
146 *
147 * @param _anonymousAuthentication
148 * set to true to use anonymous authentication
149 */
150 public void setAnonymousAuthentication(boolean anonymousAuthentication) {
151 this.anonymousAuthentication = anonymousAuthentication;
152 }
153
154 /**
155 * sets the LDAP search scope
156 *
157 * @param _searchScope
158 * LDAP JNDI search scope
159 */
160 public void setSearchScope(String searchScope) throws Exception {
161 int scope;
162 if (searchScope.equals("OBJECT_SCOPE")) {
163 scope = SearchControls.OBJECT_SCOPE;
164 } else if (searchScope.equals("ONELEVEL_SCOPE")) {
165 scope = SearchControls.ONELEVEL_SCOPE;
166 } else if (searchScope.equals("SUBTREE_SCOPE")) {
167 scope = SearchControls.SUBTREE_SCOPE;
168 } else {
169 throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
170 }
171 searchControls.setSearchScope(scope);
172 }
173
174 /**
175 * sets the LDAP search filter as defined in RFC 2254
176 *
177 * @param _searchFilter
178 * LDAP search filter
179 * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
180 */
181 public void setSearchFilter(String searchFilter) {
182 this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))";
183 }
184
185 /**
186 * enables/disable a persistent search to the LDAP server as defined in
187 * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
188 *
189 * @param _searchEventListener
190 * enable = true, disable = false (default)
191 * @see <a
192 * href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
193 */
194 public void setSearchEventListener(boolean searchEventListener) {
195 this.searchEventListener = searchEventListener;
196 }
197
198 /**
199 * start the connector
200 */
201 public void start() throws Exception {
202 LOG.info("connecting...");
203 Hashtable<String, String> env = new Hashtable<String, String>();
204 env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
205 this.ldapURI = getUri();
206 LOG.debug(" URI [" + this.ldapURI + "]");
207 env.put(Context.PROVIDER_URL, this.ldapURI.toString());
208 if (anonymousAuthentication) {
209 LOG.debug(" login credentials [anonymous]");
210 env.put(Context.SECURITY_AUTHENTICATION, "none");
211 } else {
212 LOG.debug(" login credentials [" + user + ":******]");
213 env.put(Context.SECURITY_PRINCIPAL, user);
214 env.put(Context.SECURITY_CREDENTIALS, password);
215 }
216 boolean isConnected = false;
217 while (!isConnected) {
218 try {
219 context = new InitialDirContext(env);
220 isConnected = true;
221 } catch (CommunicationException err) {
222 if (failover) {
223 this.ldapURI = getUri();
224 LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
225 env.put(Context.PROVIDER_URL, this.ldapURI.toString());
226 Thread.sleep(curReconnectDelay);
227 curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
228 } else {
229 throw err;
230 }
231 }
232 }
233
234 // add connectors from search results
235 LOG.info("searching for network connectors...");
236 LOG.debug(" base [" + base + "]");
237 LOG.debug(" filter [" + searchFilter + "]");
238 LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
239 NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
240 while (results.hasMore()) {
241 addConnector(results.next());
242 }
243
244 // register persistent search event listener
245 if (searchEventListener) {
246 LOG.info("registering persistent search listener...");
247 EventDirContext eventContext = (EventDirContext) context.lookup("");
248 eventContext.addNamingListener(base, searchFilter, searchControls, this);
249 } else { // otherwise close context (i.e. connection as it is no longer needed)
250 context.close();
251 }
252 }
253
254 /**
255 * stop the connector
256 */
257 public void stop() throws Exception {
258 LOG.info("stopping context...");
259 for (NetworkConnector connector : connectorMap.values()) {
260 connector.stop();
261 }
262 connectorMap.clear();
263 referenceMap.clear();
264 uuidMap.clear();
265 context.close();
266 }
267
268 public String toString() {
269 return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
270 }
271
272 /**
273 * add connector of the given URI
274 *
275 * @param result
276 * search result of connector to add
277 */
278 protected synchronized void addConnector(SearchResult result) throws Exception {
279 String uuid = toUUID(result);
280 if (uuidMap.containsKey(uuid)) {
281 LOG.warn("connector already regsitered for UUID [" + uuid + "]");
282 return;
283 }
284
285 URI connectorURI = toURI(result);
286 if (connectorMap.containsKey(connectorURI)) {
287 int referenceCount = referenceMap.get(connectorURI) + 1;
288 LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
289 referenceMap.put(connectorURI, referenceCount);
290 uuidMap.put(uuid, connectorURI);
291 return;
292 }
293
294 // FIXME: disable JMX listing of LDAP managed connectors, we will
295 // want to map/manage these differently in the future
296 // boolean useJMX = getBrokerService().isUseJmx();
297 // getBrokerService().setUseJmx(false);
298 NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
299 // getBrokerService().setUseJmx(useJMX);
300
301 // Propagate standard connector properties that may have been set via XML
302 connector.setDynamicOnly(isDynamicOnly());
303 connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
304 connector.setNetworkTTL(getNetworkTTL());
305 connector.setConduitSubscriptions(isConduitSubscriptions());
306 connector.setExcludedDestinations(getExcludedDestinations());
307 connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
308 connector.setDuplex(isDuplex());
309
310 // XXX: set in the BrokerService.startAllConnectors method and is
311 // required to prevent remote broker exceptions upon connection
312 connector.setLocalUri(getBrokerService().getVmConnectorURI());
313 connector.setBrokerName(getBrokerService().getBrokerName());
314 connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
315
316 // start network connector
317 connectorMap.put(connectorURI, connector);
318 referenceMap.put(connectorURI, 1);
319 uuidMap.put(uuid, connectorURI);
320 connector.start();
321 LOG.info("connector added with URI [" + connectorURI + "]");
322 }
323
324 /**
325 * remove connector of the given URI
326 *
327 * @param result
328 * search result of connector to remove
329 */
330 protected synchronized void removeConnector(SearchResult result) throws Exception {
331 String uuid = toUUID(result);
332 if (!uuidMap.containsKey(uuid)) {
333 LOG.warn("connector not regsitered for UUID [" + uuid + "]");
334 return;
335 }
336
337 URI connectorURI = uuidMap.get(uuid);
338 if (!connectorMap.containsKey(connectorURI)) {
339 LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
340 return;
341 }
342
343 int referenceCount = referenceMap.get(connectorURI) - 1;
344 referenceMap.put(connectorURI, referenceCount);
345 uuidMap.remove(uuid);
346 LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
347
348 if (referenceCount > 0) {
349 return;
350 }
351
352 NetworkConnector connector = connectorMap.remove(connectorURI);
353 connector.stop();
354 LOG.info("connector removed with URI [" + connectorURI + "]");
355 }
356
357 /**
358 * convert search result into URI
359 *
360 * @param result
361 * search result to convert to URI
362 */
363 protected URI toURI(SearchResult result) throws Exception {
364 Attributes attributes = result.getAttributes();
365 String address = (String) attributes.get("iphostnumber").get();
366 String port = (String) attributes.get("ipserviceport").get();
367 String protocol = (String) attributes.get("ipserviceprotocol").get();
368 URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
369 LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
370 return connectorURI;
371 }
372
373 /**
374 * convert search result into URI
375 *
376 * @param result
377 * search result to convert to URI
378 */
379 protected String toUUID(SearchResult result) {
380 String uuid = result.getNameInNamespace();
381 LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
382 return uuid;
383 }
384
385 /**
386 * invoked when an entry has been added during a persistent search
387 */
388 public void objectAdded(NamingEvent event) {
389 LOG.debug("entry added");
390 try {
391 addConnector((SearchResult) event.getNewBinding());
392 } catch (Exception err) {
393 LOG.error("ERR: caught unexpected exception", err);
394 }
395 }
396
397 /**
398 * invoked when an entry has been removed during a persistent search
399 */
400 public void objectRemoved(NamingEvent event) {
401 LOG.debug("entry removed");
402 try {
403 removeConnector((SearchResult) event.getOldBinding());
404 } catch (Exception err) {
405 LOG.error("ERR: caught unexpected exception", err);
406 }
407 }
408
409 /**
410 * invoked when an entry has been renamed during a persistent search
411 */
412 public void objectRenamed(NamingEvent event) {
413 LOG.debug("entry renamed");
414 // XXX: getNameInNamespace method does not seem to work properly,
415 // but getName seems to provide the result we want
416 String uuidOld = event.getOldBinding().getName();
417 String uuidNew = event.getNewBinding().getName();
418 URI connectorURI = uuidMap.remove(uuidOld);
419 uuidMap.put(uuidNew, connectorURI);
420 LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
421 }
422
423 /**
424 * invoked when an entry has been changed during a persistent search
425 */
426 public void objectChanged(NamingEvent event) {
427 LOG.debug("entry changed");
428 try {
429 SearchResult result = (SearchResult) event.getNewBinding();
430 removeConnector(result);
431 addConnector(result);
432 } catch (Exception err) {
433 LOG.error("ERR: caught unexpected exception", err);
434 }
435 }
436
437 /**
438 * invoked when an exception has occurred during a persistent search
439 */
440 public void namingExceptionThrown(NamingExceptionEvent event) {
441 LOG.error("ERR: caught unexpected exception", event.getException());
442 }
443 }