001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    import java.util.Hashtable;
021    import java.util.Map;
022    import java.util.concurrent.ConcurrentHashMap;
023    
024    import javax.naming.CommunicationException;
025    import javax.naming.Context;
026    import javax.naming.NamingEnumeration;
027    import javax.naming.directory.Attributes;
028    import javax.naming.directory.DirContext;
029    import javax.naming.directory.InitialDirContext;
030    import javax.naming.directory.SearchControls;
031    import javax.naming.directory.SearchResult;
032    import javax.naming.event.EventDirContext;
033    import javax.naming.event.NamespaceChangeListener;
034    import javax.naming.event.NamingEvent;
035    import javax.naming.event.NamingExceptionEvent;
036    import javax.naming.event.ObjectChangeListener;
037    
038    import org.apache.activemq.util.URISupport;
039    import org.apache.activemq.util.URISupport.CompositeData;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     * class to create dynamic network connectors listed in an directory server
045     * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the
046     * directory server must implement the ipHost and ipService objectClasses as
047     * defined in RFC 2307.
048     *
049     * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
050     * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
051     *
052     * @org.apache.xbean.XBean element="ldapNetworkConnector"
053     */
054    public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener {
055        private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
056    
057        // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
058        private static final String REQUIRED_OBJECT_CLASS_FILTER =
059                "(&(objectClass=ipHost)(objectClass=ipService))";
060    
061        // connection
062        private URI[] availableURIs = null;
063        private int availableURIsIndex = 0;
064        private String base = null;
065        private boolean failover = false;
066        private long curReconnectDelay = 1000; /* 1 sec */
067        private long maxReconnectDelay = 30000; /* 30 sec */
068    
069        // authentication
070        private String user = null;
071        private String password = null;
072        private boolean anonymousAuthentication = false;
073    
074        // search
075        private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
076        private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
077        private boolean searchEventListener = false;
078    
079        // connector management
080        private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>();
081        private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>();
082        private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>();
083    
084        // local context
085        private DirContext context = null;
086        // currently in use URI
087        private URI ldapURI = null;
088    
089        /**
090         * returns the next URI from the configured list
091         *
092         * @return random URI from the configured list
093         */
094        public URI getUri() {
095            return availableURIs[++availableURIsIndex % availableURIs.length];
096        }
097    
098        /**
099         * sets the LDAP server URI
100         *
101         * @param _uri
102         *            LDAP server URI
103         */
104        public void setUri(URI uri) throws Exception {
105            CompositeData data = URISupport.parseComposite(uri);
106            if (data.getScheme().equals("failover")) {
107                availableURIs = data.getComponents();
108                failover = true;
109            } else {
110                availableURIs = new URI[] { uri };
111            }
112        }
113    
114        /**
115         * sets the base LDAP dn used for lookup operations
116         *
117         * @param _base
118         *            LDAP base dn
119         */
120        public void setBase(String base) {
121            this.base = base;
122        }
123    
124        /**
125         * sets the LDAP user for access credentials
126         *
127         * @param _user
128         *            LDAP dn of user
129         */
130        public void setUser(String user) {
131            this.user = user;
132        }
133    
134        /**
135         * sets the LDAP password for access credentials
136         *
137         * @param _password
138         *            user password
139         */
140        public void setPassword(String password) {
141            this.password = password;
142        }
143    
144        /**
145         * sets LDAP anonymous authentication access credentials
146         *
147         * @param _anonymousAuthentication
148         *            set to true to use anonymous authentication
149         */
150        public void setAnonymousAuthentication(boolean anonymousAuthentication) {
151            this.anonymousAuthentication = anonymousAuthentication;
152        }
153    
154        /**
155         * sets the LDAP search scope
156         *
157         * @param _searchScope
158         *            LDAP JNDI search scope
159         */
160        public void setSearchScope(String searchScope) throws Exception {
161            int scope;
162            if (searchScope.equals("OBJECT_SCOPE")) {
163                scope = SearchControls.OBJECT_SCOPE;
164            } else if (searchScope.equals("ONELEVEL_SCOPE")) {
165                scope = SearchControls.ONELEVEL_SCOPE;
166            } else if (searchScope.equals("SUBTREE_SCOPE")) {
167                scope = SearchControls.SUBTREE_SCOPE;
168            } else {
169                throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
170            }
171            searchControls.setSearchScope(scope);
172        }
173    
174        /**
175         * sets the LDAP search filter as defined in RFC 2254
176         *
177         * @param _searchFilter
178         *            LDAP search filter
179         * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
180         */
181        public void setSearchFilter(String searchFilter) {
182            this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))";
183        }
184    
185        /**
186         * enables/disable a persistent search to the LDAP server as defined in
187         * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
188         *
189         * @param _searchEventListener
190         *            enable = true, disable = false (default)
191         * @see <a
192         *      href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
193         */
194        public void setSearchEventListener(boolean searchEventListener) {
195            this.searchEventListener = searchEventListener;
196        }
197    
198        /**
199         * start the connector
200         */
201        public void start() throws Exception {
202            LOG.info("connecting...");
203            Hashtable<String, String> env = new Hashtable<String, String>();
204            env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
205            this.ldapURI = getUri();
206            LOG.debug("    URI [" + this.ldapURI + "]");
207            env.put(Context.PROVIDER_URL, this.ldapURI.toString());
208            if (anonymousAuthentication) {
209                LOG.debug("    login credentials [anonymous]");
210                env.put(Context.SECURITY_AUTHENTICATION, "none");
211            } else {
212                LOG.debug("    login credentials [" + user + ":******]");
213                env.put(Context.SECURITY_PRINCIPAL, user);
214                env.put(Context.SECURITY_CREDENTIALS, password);
215            }
216            boolean isConnected = false;
217            while (!isConnected) {
218                try {
219                    context = new InitialDirContext(env);
220                    isConnected = true;
221                } catch (CommunicationException err) {
222                    if (failover) {
223                        this.ldapURI = getUri();
224                        LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
225                        env.put(Context.PROVIDER_URL, this.ldapURI.toString());
226                        Thread.sleep(curReconnectDelay);
227                        curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
228                    } else {
229                        throw err;
230                    }
231                }
232            }
233    
234            // add connectors from search results
235            LOG.info("searching for network connectors...");
236            LOG.debug("    base   [" + base + "]");
237            LOG.debug("    filter [" + searchFilter + "]");
238            LOG.debug("    scope  [" + searchControls.getSearchScope() + "]");
239            NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
240            while (results.hasMore()) {
241                addConnector(results.next());
242            }
243    
244            // register persistent search event listener
245            if (searchEventListener) {
246                LOG.info("registering persistent search listener...");
247                EventDirContext eventContext = (EventDirContext) context.lookup("");
248                eventContext.addNamingListener(base, searchFilter, searchControls, this);
249            } else { // otherwise close context (i.e. connection as it is no longer needed)
250                context.close();
251            }
252        }
253    
254        /**
255         * stop the connector
256         */
257        public void stop() throws Exception {
258            LOG.info("stopping context...");
259            for (NetworkConnector connector : connectorMap.values()) {
260                connector.stop();
261            }
262            connectorMap.clear();
263            referenceMap.clear();
264            uuidMap.clear();
265            context.close();
266        }
267    
268        public String toString() {
269            return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
270        }
271    
272        /**
273         * add connector of the given URI
274         *
275         * @param result
276         *            search result of connector to add
277         */
278        protected synchronized void addConnector(SearchResult result) throws Exception {
279            String uuid = toUUID(result);
280            if (uuidMap.containsKey(uuid)) {
281                LOG.warn("connector already regsitered for UUID [" + uuid + "]");
282                return;
283            }
284    
285            URI connectorURI = toURI(result);
286            if (connectorMap.containsKey(connectorURI)) {
287                int referenceCount = referenceMap.get(connectorURI) + 1;
288                LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
289                referenceMap.put(connectorURI, referenceCount);
290                uuidMap.put(uuid, connectorURI);
291                return;
292            }
293    
294            // FIXME: disable JMX listing of LDAP managed connectors, we will
295            // want to map/manage these differently in the future
296            // boolean useJMX = getBrokerService().isUseJmx();
297            // getBrokerService().setUseJmx(false);
298            NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
299            // getBrokerService().setUseJmx(useJMX);
300    
301            // Propagate standard connector properties that may have been set via XML
302            connector.setDynamicOnly(isDynamicOnly());
303            connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
304            connector.setNetworkTTL(getNetworkTTL());
305            connector.setConduitSubscriptions(isConduitSubscriptions());
306            connector.setExcludedDestinations(getExcludedDestinations());
307            connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
308            connector.setDuplex(isDuplex());
309    
310            // XXX: set in the BrokerService.startAllConnectors method and is
311            // required to prevent remote broker exceptions upon connection
312            connector.setLocalUri(getBrokerService().getVmConnectorURI());
313            connector.setBrokerName(getBrokerService().getBrokerName());
314            connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
315    
316            // start network connector
317            connectorMap.put(connectorURI, connector);
318            referenceMap.put(connectorURI, 1);
319            uuidMap.put(uuid, connectorURI);
320            connector.start();
321            LOG.info("connector added with URI [" + connectorURI + "]");
322        }
323    
324        /**
325         * remove connector of the given URI
326         *
327         * @param result
328         *            search result of connector to remove
329         */
330        protected synchronized void removeConnector(SearchResult result) throws Exception {
331            String uuid = toUUID(result);
332            if (!uuidMap.containsKey(uuid)) {
333                LOG.warn("connector not regsitered for UUID [" + uuid + "]");
334                return;
335            }
336    
337            URI connectorURI = uuidMap.get(uuid);
338            if (!connectorMap.containsKey(connectorURI)) {
339                LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
340                return;
341            }
342    
343            int referenceCount = referenceMap.get(connectorURI) - 1;
344            referenceMap.put(connectorURI, referenceCount);
345            uuidMap.remove(uuid);
346            LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
347    
348            if (referenceCount > 0) {
349                return;
350            }
351    
352            NetworkConnector connector = connectorMap.remove(connectorURI);
353            connector.stop();
354            LOG.info("connector removed with URI [" + connectorURI + "]");
355        }
356    
357        /**
358         * convert search result into URI
359         *
360         * @param result
361         *            search result to convert to URI
362         */
363        protected URI toURI(SearchResult result) throws Exception {
364            Attributes attributes = result.getAttributes();
365            String address = (String) attributes.get("iphostnumber").get();
366            String port = (String) attributes.get("ipserviceport").get();
367            String protocol = (String) attributes.get("ipserviceprotocol").get();
368            URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
369            LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
370            return connectorURI;
371        }
372    
373        /**
374         * convert search result into URI
375         *
376         * @param result
377         *            search result to convert to URI
378         */
379        protected String toUUID(SearchResult result) {
380            String uuid = result.getNameInNamespace();
381            LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
382            return uuid;
383        }
384    
385        /**
386         * invoked when an entry has been added during a persistent search
387         */
388        public void objectAdded(NamingEvent event) {
389            LOG.debug("entry added");
390            try {
391                addConnector((SearchResult) event.getNewBinding());
392            } catch (Exception err) {
393                LOG.error("ERR: caught unexpected exception", err);
394            }
395        }
396    
397        /**
398         * invoked when an entry has been removed during a persistent search
399         */
400        public void objectRemoved(NamingEvent event) {
401            LOG.debug("entry removed");
402            try {
403                removeConnector((SearchResult) event.getOldBinding());
404            } catch (Exception err) {
405                LOG.error("ERR: caught unexpected exception", err);
406            }
407        }
408    
409        /**
410         * invoked when an entry has been renamed during a persistent search
411         */
412        public void objectRenamed(NamingEvent event) {
413            LOG.debug("entry renamed");
414            // XXX: getNameInNamespace method does not seem to work properly,
415            // but getName seems to provide the result we want
416            String uuidOld = event.getOldBinding().getName();
417            String uuidNew = event.getNewBinding().getName();
418            URI connectorURI = uuidMap.remove(uuidOld);
419            uuidMap.put(uuidNew, connectorURI);
420            LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
421        }
422    
423        /**
424         * invoked when an entry has been changed during a persistent search
425         */
426        public void objectChanged(NamingEvent event) {
427            LOG.debug("entry changed");
428            try {
429                SearchResult result = (SearchResult) event.getNewBinding();
430                removeConnector(result);
431                addConnector(result);
432            } catch (Exception err) {
433                LOG.error("ERR: caught unexpected exception", err);
434            }
435        }
436    
437        /**
438         * invoked when an exception has occurred during a persistent search
439         */
440        public void namingExceptionThrown(NamingExceptionEvent event) {
441            LOG.error("ERR: caught unexpected exception", event.getException());
442        }
443    }