001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    import java.util.Hashtable;
021    import java.util.Map;
022    import java.util.concurrent.ConcurrentHashMap;
023    
024    import javax.naming.CommunicationException;
025    import javax.naming.Context;
026    import javax.naming.NamingEnumeration;
027    import javax.naming.directory.Attributes;
028    import javax.naming.directory.DirContext;
029    import javax.naming.directory.InitialDirContext;
030    import javax.naming.directory.SearchControls;
031    import javax.naming.directory.SearchResult;
032    import javax.naming.event.EventDirContext;
033    import javax.naming.event.NamespaceChangeListener;
034    import javax.naming.event.NamingEvent;
035    import javax.naming.event.NamingExceptionEvent;
036    import javax.naming.event.ObjectChangeListener;
037    
038    import org.apache.activemq.util.URISupport;
039    import org.apache.activemq.util.URISupport.CompositeData;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     * class to create dynamic network connectors listed in an directory server
045     * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the
046     * directory server must implement the ipHost and ipService objectClasses as
047     * defined in RFC 2307.
048     *
049     * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
050     * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
051     *
052     * @org.apache.xbean.XBean element="ldapNetworkConnector"
053     */
054    public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener {
055        private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
056    
057        // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
058        private static final String REQUIRED_OBJECT_CLASS_FILTER =
059                "(&(objectClass=ipHost)(objectClass=ipService))";
060    
061        // connection
062        private URI[] availableURIs = null;
063        private int availableURIsIndex = 0;
064        private String base = null;
065        private boolean failover = false;
066        private long curReconnectDelay = 1000; /* 1 sec */
067        private long maxReconnectDelay = 30000; /* 30 sec */
068    
069        // authentication
070        private String user = null;
071        private String password = null;
072        private boolean anonymousAuthentication = false;
073    
074        // search
075        private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
076        private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
077        private boolean searchEventListener = false;
078    
079        // connector management
080        private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>();
081        private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>();
082        private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>();
083    
084        // local context
085        private DirContext context = null;
086        // currently in use URI
087        private URI ldapURI = null;
088    
089        /**
090         * returns the next URI from the configured list
091         *
092         * @return random URI from the configured list
093         */
094        public URI getUri() {
095            return availableURIs[++availableURIsIndex % availableURIs.length];
096        }
097    
098        /**
099         * sets the LDAP server URI
100         *
101         * @param _uri
102         *            LDAP server URI
103         */
104        public void setUri(URI uri) throws Exception {
105            CompositeData data = URISupport.parseComposite(uri);
106            if (data.getScheme().equals("failover")) {
107                availableURIs = data.getComponents();
108                failover = true;
109            } else {
110                availableURIs = new URI[] { uri };
111            }
112        }
113    
114        /**
115         * sets the base LDAP dn used for lookup operations
116         *
117         * @param _base
118         *            LDAP base dn
119         */
120        public void setBase(String base) {
121            this.base = base;
122        }
123    
124        /**
125         * sets the LDAP user for access credentials
126         *
127         * @param _user
128         *            LDAP dn of user
129         */
130        public void setUser(String user) {
131            this.user = user;
132        }
133    
134        /**
135         * sets the LDAP password for access credentials
136         *
137         * @param _password
138         *            user password
139         */
140        public void setPassword(String password) {
141            this.password = password;
142        }
143    
144        /**
145         * sets LDAP anonymous authentication access credentials
146         *
147         * @param _anonymousAuthentication
148         *            set to true to use anonymous authentication
149         */
150        public void setAnonymousAuthentication(boolean anonymousAuthentication) {
151            this.anonymousAuthentication = anonymousAuthentication;
152        }
153    
154        /**
155         * sets the LDAP search scope
156         *
157         * @param _searchScope
158         *            LDAP JNDI search scope
159         */
160        public void setSearchScope(String searchScope) throws Exception {
161            int scope;
162            if (searchScope.equals("OBJECT_SCOPE")) {
163                scope = SearchControls.OBJECT_SCOPE;
164            } else if (searchScope.equals("ONELEVEL_SCOPE")) {
165                scope = SearchControls.ONELEVEL_SCOPE;
166            } else if (searchScope.equals("SUBTREE_SCOPE")) {
167                scope = SearchControls.SUBTREE_SCOPE;
168            } else {
169                throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
170            }
171            searchControls.setSearchScope(scope);
172        }
173    
174        /**
175         * sets the LDAP search filter as defined in RFC 2254
176         *
177         * @param _searchFilter
178         *            LDAP search filter
179         * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
180         */
181        public void setSearchFilter(String searchFilter) {
182            this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))";
183        }
184    
185        /**
186         * enables/disable a persistent search to the LDAP server as defined in
187         * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
188         *
189         * @param _searchEventListener
190         *            enable = true, disable = false (default)
191         * @see <a
192         *      href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
193         */
194        public void setSearchEventListener(boolean searchEventListener) {
195            this.searchEventListener = searchEventListener;
196        }
197    
198        /**
199         * start the connector
200         */
201        public void start() throws Exception {
202            LOG.info("connecting...");
203            Hashtable<String, String> env = new Hashtable<String, String>();
204            env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
205            this.ldapURI = getUri();
206            LOG.debug("    URI [{}]", this.ldapURI);
207            env.put(Context.PROVIDER_URL, this.ldapURI.toString());
208            if (anonymousAuthentication) {
209                LOG.debug("    login credentials [anonymous]");
210                env.put(Context.SECURITY_AUTHENTICATION, "none");
211            } else {
212                LOG.debug("    login credentials [{}:******]", user);
213                env.put(Context.SECURITY_PRINCIPAL, user);
214                env.put(Context.SECURITY_CREDENTIALS, password);
215            }
216            boolean isConnected = false;
217            while (!isConnected) {
218                try {
219                    context = new InitialDirContext(env);
220                    isConnected = true;
221                } catch (CommunicationException err) {
222                    if (failover) {
223                        this.ldapURI = getUri();
224                        LOG.error("connection error [{}], failover connection to [{}]", env.get(Context.PROVIDER_URL), this.ldapURI.toString());
225                        env.put(Context.PROVIDER_URL, this.ldapURI.toString());
226                        Thread.sleep(curReconnectDelay);
227                        curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
228                    } else {
229                        throw err;
230                    }
231                }
232            }
233    
234            // add connectors from search results
235            LOG.info("searching for network connectors...");
236            LOG.debug("    base   [{}]", base);
237            LOG.debug("    filter [{}]", searchFilter);
238            LOG.debug("    scope  [{}]", searchControls.getSearchScope());
239            NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
240            while (results.hasMore()) {
241                addConnector(results.next());
242            }
243    
244            // register persistent search event listener
245            if (searchEventListener) {
246                LOG.info("registering persistent search listener...");
247                EventDirContext eventContext = (EventDirContext) context.lookup("");
248                eventContext.addNamingListener(base, searchFilter, searchControls, this);
249            } else { // otherwise close context (i.e. connection as it is no longer needed)
250                context.close();
251            }
252        }
253    
254        /**
255         * stop the connector
256         */
257        public void stop() throws Exception {
258            LOG.info("stopping context...");
259            for (NetworkConnector connector : connectorMap.values()) {
260                connector.stop();
261            }
262            connectorMap.clear();
263            referenceMap.clear();
264            uuidMap.clear();
265            context.close();
266        }
267    
268        public String toString() {
269            return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
270        }
271    
272        /**
273         * add connector of the given URI
274         *
275         * @param result
276         *            search result of connector to add
277         */
278        protected synchronized void addConnector(SearchResult result) throws Exception {
279            String uuid = toUUID(result);
280            if (uuidMap.containsKey(uuid)) {
281                LOG.warn("connector already regsitered for UUID [{}]", uuid);
282                return;
283            }
284    
285            URI connectorURI = toURI(result);
286            if (connectorMap.containsKey(connectorURI)) {
287                int referenceCount = referenceMap.get(connectorURI) + 1;
288                LOG.warn("connector reference added for URI [{}], UUID [{}], total reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount });
289                referenceMap.put(connectorURI, referenceCount);
290                uuidMap.put(uuid, connectorURI);
291                return;
292            }
293    
294            // FIXME: disable JMX listing of LDAP managed connectors, we will
295            // want to map/manage these differently in the future
296            // boolean useJMX = getBrokerService().isUseJmx();
297            // getBrokerService().setUseJmx(false);
298            NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
299            // getBrokerService().setUseJmx(useJMX);
300    
301            // Propagate standard connector properties that may have been set via XML
302            connector.setDynamicOnly(isDynamicOnly());
303            connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
304            connector.setNetworkTTL(getNetworkTTL());
305            connector.setConsumerTTL(getConsumerTTL());
306            connector.setMessageTTL(getMessageTTL());
307            connector.setConduitSubscriptions(isConduitSubscriptions());
308            connector.setExcludedDestinations(getExcludedDestinations());
309            connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
310            connector.setDuplex(isDuplex());
311    
312            // XXX: set in the BrokerService.startAllConnectors method and is
313            // required to prevent remote broker exceptions upon connection
314            connector.setLocalUri(getBrokerService().getVmConnectorURI());
315            connector.setBrokerName(getBrokerService().getBrokerName());
316            connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
317    
318            // start network connector
319            connectorMap.put(connectorURI, connector);
320            referenceMap.put(connectorURI, 1);
321            uuidMap.put(uuid, connectorURI);
322            connector.start();
323            LOG.info("connector added with URI [{}]", connectorURI);
324        }
325    
326        /**
327         * remove connector of the given URI
328         *
329         * @param result
330         *            search result of connector to remove
331         */
332        protected synchronized void removeConnector(SearchResult result) throws Exception {
333            String uuid = toUUID(result);
334            if (!uuidMap.containsKey(uuid)) {
335                LOG.warn("connector not registered for UUID [{}]", uuid);
336                return;
337            }
338    
339            URI connectorURI = uuidMap.get(uuid);
340            if (!connectorMap.containsKey(connectorURI)) {
341                LOG.warn("connector not registered for URI [{}]", connectorURI);
342                return;
343            }
344    
345            int referenceCount = referenceMap.get(connectorURI) - 1;
346            referenceMap.put(connectorURI, referenceCount);
347            uuidMap.remove(uuid);
348            LOG.debug("connector referenced removed for URI [{}], UUID[{}], remaining reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount });
349    
350            if (referenceCount > 0) {
351                return;
352            }
353    
354            NetworkConnector connector = connectorMap.remove(connectorURI);
355            connector.stop();
356            LOG.info("connector removed with URI [{}]", connectorURI);
357        }
358    
359        /**
360         * convert search result into URI
361         *
362         * @param result
363         *            search result to convert to URI
364         */
365        protected URI toURI(SearchResult result) throws Exception {
366            Attributes attributes = result.getAttributes();
367            String address = (String) attributes.get("iphostnumber").get();
368            String port = (String) attributes.get("ipserviceport").get();
369            String protocol = (String) attributes.get("ipserviceprotocol").get();
370            URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
371            LOG.debug("retrieved URI from SearchResult [{}]", connectorURI);
372            return connectorURI;
373        }
374    
375        /**
376         * convert search result into URI
377         *
378         * @param result
379         *            search result to convert to URI
380         */
381        protected String toUUID(SearchResult result) {
382            String uuid = result.getNameInNamespace();
383            LOG.debug("retrieved UUID from SearchResult [{}]", uuid);
384            return uuid;
385        }
386    
387        /**
388         * invoked when an entry has been added during a persistent search
389         */
390        public void objectAdded(NamingEvent event) {
391            LOG.debug("entry added");
392            try {
393                addConnector((SearchResult) event.getNewBinding());
394            } catch (Exception err) {
395                LOG.error("ERR: caught unexpected exception", err);
396            }
397        }
398    
399        /**
400         * invoked when an entry has been removed during a persistent search
401         */
402        public void objectRemoved(NamingEvent event) {
403            LOG.debug("entry removed");
404            try {
405                removeConnector((SearchResult) event.getOldBinding());
406            } catch (Exception err) {
407                LOG.error("ERR: caught unexpected exception", err);
408            }
409        }
410    
411        /**
412         * invoked when an entry has been renamed during a persistent search
413         */
414        public void objectRenamed(NamingEvent event) {
415            LOG.debug("entry renamed");
416            // XXX: getNameInNamespace method does not seem to work properly,
417            // but getName seems to provide the result we want
418            String uuidOld = event.getOldBinding().getName();
419            String uuidNew = event.getNewBinding().getName();
420            URI connectorURI = uuidMap.remove(uuidOld);
421            uuidMap.put(uuidNew, connectorURI);
422            LOG.debug("connector reference renamed for URI [{}], Old UUID [{}], New UUID [{}]", new Object[]{ connectorURI, uuidOld, uuidNew });
423        }
424    
425        /**
426         * invoked when an entry has been changed during a persistent search
427         */
428        public void objectChanged(NamingEvent event) {
429            LOG.debug("entry changed");
430            try {
431                SearchResult result = (SearchResult) event.getNewBinding();
432                removeConnector(result);
433                addConnector(result);
434            } catch (Exception err) {
435                LOG.error("ERR: caught unexpected exception", err);
436            }
437        }
438    
439        /**
440         * invoked when an exception has occurred during a persistent search
441         */
442        public void namingExceptionThrown(NamingExceptionEvent event) {
443            LOG.error("ERR: caught unexpected exception", event.getException());
444        }
445    }