001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024
025import org.apache.activemq.command.DiscoveryEvent;
026import org.apache.activemq.transport.CompositeTransport;
027import org.apache.activemq.transport.TransportFilter;
028import org.apache.activemq.util.ServiceStopper;
029import org.apache.activemq.util.Suspendable;
030import org.apache.activemq.util.URISupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * A {@link TransportFilter} which uses a {@link DiscoveryAgent} to
036 * discover remote broker instances and dynamically connect to them.
037 */
038public class DiscoveryTransport extends TransportFilter implements DiscoveryListener {
039
040    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryTransport.class);
041
042    private final CompositeTransport next;
043    private DiscoveryAgent discoveryAgent;
044    private final ConcurrentMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
045
046    private Map<String, String> parameters;
047
048    public DiscoveryTransport(CompositeTransport next) {
049        super(next);
050        this.next = next;
051    }
052
053    @Override
054    public void start() throws Exception {
055        if (discoveryAgent == null) {
056            throw new IllegalStateException("discoveryAgent not configured");
057        }
058
059        // lets pass into the agent the broker name and connection details
060        discoveryAgent.setDiscoveryListener(this);
061        discoveryAgent.start();
062        next.start();
063    }
064
065    @Override
066    public void stop() throws Exception {
067        ServiceStopper ss = new ServiceStopper();
068        ss.stop(discoveryAgent);
069        ss.stop(next);
070        ss.throwFirstException();
071    }
072
073    @Override
074    public void onServiceAdd(DiscoveryEvent event) {
075        String url = event.getServiceName();
076        if (url != null) {
077            try {
078                URI uri = new URI(url);
079                LOG.info("Adding new broker connection URL: " + uri);
080                uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX);
081                serviceURIs.put(event.getServiceName(), uri);
082                next.add(false,new URI[] {uri});
083            } catch (URISyntaxException e) {
084                LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
085            }
086        }
087    }
088
089    @Override
090    public void onServiceRemove(DiscoveryEvent event) {
091        URI uri = serviceURIs.get(event.getServiceName());
092        if (uri != null) {
093            next.remove(false,new URI[] {uri});
094        }
095    }
096
097    public DiscoveryAgent getDiscoveryAgent() {
098        return discoveryAgent;
099    }
100
101    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
102        this.discoveryAgent = discoveryAgent;
103    }
104
105    public void setParameters(Map<String, String> parameters) {
106       this.parameters = parameters;
107    }
108
109    @Override
110    public void transportResumed() {
111        if( discoveryAgent instanceof Suspendable ) {
112            try {
113                ((Suspendable)discoveryAgent).suspend();
114            } catch (Exception e) {
115                LOG.warn("Exception suspending discoverAgent: ", discoveryAgent);
116            }
117        }
118        super.transportResumed();
119    }
120
121    @Override
122    public void transportInterupted() {
123        if( discoveryAgent instanceof Suspendable ) {
124            try {
125                ((Suspendable)discoveryAgent).resume();
126            } catch (Exception e) {
127                LOG.warn("Exception resuming discoverAgent: ", discoveryAgent);
128            }
129        }
130        super.transportInterupted();
131    }
132}