001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.fanout;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.HashMap;
023    import java.util.Map;
024    
025    import org.apache.activemq.transport.MutexTransport;
026    import org.apache.activemq.transport.ResponseCorrelator;
027    import org.apache.activemq.transport.Transport;
028    import org.apache.activemq.transport.TransportFactory;
029    import org.apache.activemq.transport.TransportServer;
030    import org.apache.activemq.transport.discovery.DiscoveryTransport;
031    import org.apache.activemq.transport.discovery.DiscoveryTransportFactory;
032    import org.apache.activemq.util.IntrospectionSupport;
033    import org.apache.activemq.util.URISupport;
034    import org.apache.activemq.util.URISupport.CompositeData;
035    
036    public class FanoutTransportFactory extends TransportFactory {
037    
038        public Transport doConnect(URI location) throws IOException {
039            try {
040                Transport transport = createTransport(location);
041                transport = new MutexTransport(transport);
042                transport = new ResponseCorrelator(transport);
043                return transport;
044            } catch (URISyntaxException e) {
045                throw new IOException("Invalid location: " + location);
046            }
047        }
048    
049        public Transport doCompositeConnect(URI location) throws IOException {
050            try {
051                return createTransport(location);
052            } catch (URISyntaxException e) {
053                throw new IOException("Invalid location: " + location);
054            }
055        }
056    
057        /**
058         * @param location
059         * @return
060         * @throws IOException
061         * @throws URISyntaxException
062         */
063        public Transport createTransport(URI location) throws IOException, URISyntaxException {
064            CompositeData compositeData = URISupport.parseComposite(location);
065            Map<String, String> parameters = compositeData.getParameters();
066            FanoutTransport fanoutTransport = createTransport(parameters);        
067            DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(fanoutTransport, compositeData, parameters);
068            return discoveryTransport;
069        }
070    
071        public FanoutTransport createTransport(Map<String,String> parameters) throws IOException {
072            FanoutTransport transport = new FanoutTransport();
073            IntrospectionSupport.setProperties(transport, parameters);
074            return transport;
075        }
076    
077        public TransportServer doBind(URI location) throws IOException {
078            throw new IOException("Invalid server URI: " + location);
079        }
080    
081    }