001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.activemq.broker.BrokerFactory;
027    import org.apache.activemq.broker.BrokerFactoryHandler;
028    import org.apache.activemq.broker.BrokerRegistry;
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.TransportConnector;
031    import org.apache.activemq.transport.MarshallingTransportFilter;
032    import org.apache.activemq.transport.Transport;
033    import org.apache.activemq.transport.TransportFactory;
034    import org.apache.activemq.transport.TransportServer;
035    import org.apache.activemq.util.IOExceptionSupport;
036    import org.apache.activemq.util.IntrospectionSupport;
037    import org.apache.activemq.util.ServiceSupport;
038    import org.apache.activemq.util.URISupport;
039    import org.apache.activemq.util.URISupport.CompositeData;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    import org.slf4j.MDC;
043    
044    public class VMTransportFactory extends TransportFactory {
045        
046        public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
047        public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
048        public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
049        private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
050        
051        BrokerFactoryHandler brokerFactoryHandler;
052    
053        public Transport doConnect(URI location) throws Exception {
054            return VMTransportServer.configure(doCompositeConnect(location));
055        }
056    
057        public Transport doCompositeConnect(URI location) throws Exception {
058            URI brokerURI;
059            String host;
060            Map<String, String> options;
061            boolean create = true;
062            int waitForStart = -1;
063            CompositeData data = URISupport.parseComposite(location);
064            if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
065                brokerURI = data.getComponents()[0];
066                CompositeData brokerData = URISupport.parseComposite(brokerURI);
067                host = (String)brokerData.getParameters().get("brokerName");
068                if (host == null) {
069                    host = "localhost";
070                }
071                if (brokerData.getPath() != null) {
072                    host = brokerData.getPath();
073                }
074                options = data.getParameters();
075                location = new URI("vm://" + host);
076            } else {
077                // If using the less complex vm://localhost?broker.persistent=true
078                // form
079                try {
080                    host = extractHost(location);
081                    options = URISupport.parseParameters(location);
082                    String config = (String)options.remove("brokerConfig");
083                    if (config != null) {
084                        brokerURI = new URI(config);
085                    } else {
086                        Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
087                        brokerURI = new URI("broker://()/" + host + "?"
088                                            + URISupport.createQueryString(brokerOptions));
089                    }
090                    if ("false".equals(options.remove("create"))) {
091                        create = false;
092                    }
093                    String waitForStartString = options.remove("waitForStart");
094                    if (waitForStartString != null) {
095                        waitForStart = Integer.parseInt(waitForStartString);
096                    }
097                } catch (URISyntaxException e1) {
098                    throw IOExceptionSupport.create(e1);
099                }
100                location = new URI("vm://" + host);
101            }
102            if (host == null) {
103                host = "localhost";
104            }
105            VMTransportServer server = SERVERS.get(host);
106            // validate the broker is still active
107            if (!validateBroker(host) || server == null) {
108                BrokerService broker = null;
109                // Synchronize on the registry so that multiple concurrent threads
110                // doing this do not think that the broker has not been created and
111                // cause multiple brokers to be started.
112                synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
113                    broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
114                    if (broker == null) {
115                        if (!create) {
116                            throw new IOException("Broker named '" + host + "' does not exist.");
117                        }
118                        try {
119                            if (brokerFactoryHandler != null) {
120                                broker = brokerFactoryHandler.createBroker(brokerURI);
121                            } else {
122                                broker = BrokerFactory.createBroker(brokerURI);
123                            }
124                            broker.start();
125                            MDC.put("activemq.broker", broker.getBrokerName());
126                        } catch (URISyntaxException e) {
127                            throw IOExceptionSupport.create(e);
128                        }
129                        BROKERS.put(host, broker);
130                        BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
131                    }
132    
133                    server = SERVERS.get(host);
134                    if (server == null) {
135                        server = (VMTransportServer)bind(location, true);
136                        TransportConnector connector = new TransportConnector(server);
137                        connector.setBrokerService(broker);
138                        connector.setUri(location);
139                        connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
140                        connector.start();
141                        CONNECTORS.put(host, connector);
142                    }
143    
144                }
145            }
146    
147            VMTransport vmtransport = server.connect();
148            IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
149            IntrospectionSupport.setProperties(vmtransport, options);
150            Transport transport = vmtransport;
151            if (vmtransport.isMarshal()) {
152                Map<String, String> optionsCopy = new HashMap<String, String>(options);
153                transport = new MarshallingTransportFilter(transport, createWireFormat(options),
154                                                           createWireFormat(optionsCopy));
155            }
156            if (!options.isEmpty()) {
157                throw new IllegalArgumentException("Invalid connect parameters: " + options);
158            }
159            return transport;
160        }
161    
162       private static String extractHost(URI location) {
163           String host = location.getHost();
164           if (host == null || host.length() == 0) {
165               host = location.getAuthority();
166               if (host == null || host.length() == 0) {
167                   host = "localhost";
168               }
169           }
170           return host;
171        }
172    
173    /**
174        * @param registry
175        * @param brokerName
176        * @param waitForStart - time in milliseconds to wait for a broker to appear
177        * @return
178        */
179        private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
180            BrokerService broker = null;
181            synchronized(registry.getRegistryMutext()) {
182                broker = registry.lookup(brokerName);
183                if (broker == null && waitForStart > 0) {
184                    final long expiry = System.currentTimeMillis() + waitForStart;
185                    while (broker == null  && expiry > System.currentTimeMillis()) {
186                        long timeout = Math.max(0, expiry - System.currentTimeMillis());
187                        try {
188                            LOG.debug("waiting for broker named: " + brokerName + " to start");
189                            registry.getRegistryMutext().wait(timeout);
190                        } catch (InterruptedException ignored) {
191                        }
192                        broker = registry.lookup(brokerName);
193                    }
194                }
195            }
196            return broker;
197        }
198    
199        public TransportServer doBind(URI location) throws IOException {
200            return bind(location, false);
201        }
202    
203        /**
204         * @param location
205         * @return the TransportServer
206         * @throws IOException
207         */
208        private TransportServer bind(URI location, boolean dispose) throws IOException {
209            String host = extractHost(location);
210            LOG.debug("binding to broker: " + host);
211            VMTransportServer server = new VMTransportServer(location, dispose);
212            Object currentBoundValue = SERVERS.get(host);
213            if (currentBoundValue != null) {
214                throw new IOException("VMTransportServer already bound at: " + location);
215            }
216            SERVERS.put(host, server);
217            return server;
218        }
219    
220        public static void stopped(VMTransportServer server) {
221            String host = extractHost(server.getBindURI());
222            stopped(host);
223        }
224    
225        public static void stopped(String host) {
226            SERVERS.remove(host);
227            TransportConnector connector = CONNECTORS.remove(host);
228            if (connector != null) {
229                LOG.debug("Shutting down VM connectors for broker: " + host);
230                ServiceSupport.dispose(connector);
231                BrokerService broker = BROKERS.remove(host);
232                if (broker != null) {
233                    ServiceSupport.dispose(broker);
234                }
235                MDC.remove("activemq.broker");
236            }
237        }
238    
239        public BrokerFactoryHandler getBrokerFactoryHandler() {
240            return brokerFactoryHandler;
241        }
242    
243        public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
244            this.brokerFactoryHandler = brokerFactoryHandler;
245        }
246    
247        private boolean validateBroker(String host) {
248            boolean result = true;
249            if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) {
250                // check the broker is still in the BrokerRegistry
251                TransportConnector connector = CONNECTORS.get(host);
252                if (BrokerRegistry.getInstance().lookup(host) == null
253                    || (connector != null && connector.getBroker().isStopped())) {
254                    result = false;
255                    // clean-up
256                    BROKERS.remove(host);
257                    SERVERS.remove(host);
258                    if (connector != null) {
259                        CONNECTORS.remove(host);
260                        if (connector != null) {
261                            ServiceSupport.dispose(connector);
262                        }
263                    }
264                }
265            }
266            return result;
267        }
268    }