001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.peer;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.activemq.broker.BrokerFactoryHandler;
027    import org.apache.activemq.broker.BrokerService;
028    import org.apache.activemq.broker.TransportConnector;
029    import org.apache.activemq.transport.Transport;
030    import org.apache.activemq.transport.TransportFactory;
031    import org.apache.activemq.transport.TransportServer;
032    import org.apache.activemq.transport.vm.VMTransportFactory;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.util.IdGenerator;
035    import org.apache.activemq.util.IntrospectionSupport;
036    import org.apache.activemq.util.URISupport;
037    
038    public class PeerTransportFactory extends TransportFactory {
039    
040        public static final ConcurrentHashMap BROKERS = new ConcurrentHashMap();
041        public static final ConcurrentHashMap CONNECTORS = new ConcurrentHashMap();
042        public static final ConcurrentHashMap SERVERS = new ConcurrentHashMap();
043        private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-");
044    
045        public Transport doConnect(URI location) throws Exception {
046            VMTransportFactory vmTransportFactory = createTransportFactory(location);
047            return vmTransportFactory.doConnect(location);
048        }
049    
050        public Transport doCompositeConnect(URI location) throws Exception {
051            VMTransportFactory vmTransportFactory = createTransportFactory(location);
052            return vmTransportFactory.doCompositeConnect(location);
053        }
054    
055        /**
056         * @param location
057         * @return the converted URI
058         * @throws URISyntaxException
059         */
060        private VMTransportFactory createTransportFactory(URI location) throws IOException {
061            try {
062                String group = location.getHost();
063                String broker = URISupport.stripPrefix(location.getPath(), "/");
064    
065                if (group == null) {
066                    group = "default";
067                }
068                if (broker == null || broker.length() == 0) {
069                    broker = ID_GENERATOR.generateSanitizedId();
070                }
071    
072                final Map<String, String> brokerOptions = new HashMap<String, String>(URISupport.parseParameters(location));
073                if (!brokerOptions.containsKey("persistent")) {
074                    brokerOptions.put("persistent", "false");
075                }
076    
077                final URI finalLocation = new URI("vm://" + broker);
078                final String finalBroker = broker;
079                final String finalGroup = group;
080                VMTransportFactory rc = new VMTransportFactory() {
081                    public Transport doConnect(URI ignore) throws Exception {
082                        return super.doConnect(finalLocation);
083                    };
084    
085                    public Transport doCompositeConnect(URI ignore) throws Exception {
086                        return super.doCompositeConnect(finalLocation);
087                    };
088                };
089                rc.setBrokerFactoryHandler(new BrokerFactoryHandler() {
090                    public BrokerService createBroker(URI brokerURI) throws Exception {
091                        BrokerService service = new BrokerService();
092                        IntrospectionSupport.setProperties(service, brokerOptions);
093                        service.setBrokerName(finalBroker);
094                        TransportConnector c = service.addConnector("tcp://0.0.0.0:0");
095                        c.setDiscoveryUri(new URI("multicast://default?group=" + finalGroup));
096                        service.addNetworkConnector("multicast://default?group=" + finalGroup);
097                        return service;
098                    }
099                });
100                return rc;
101    
102            } catch (URISyntaxException e) {
103                throw IOExceptionSupport.create(e);
104            }
105        }
106    
107        public TransportServer doBind(URI location) throws IOException {
108            throw new IOException("This protocol does not support being bound.");
109        }
110    
111    }