001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.net.MalformedURLException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.net.UnknownHostException;
024    import java.util.HashMap;
025    import java.util.Map;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.Executor;
028    
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.BrokerServiceAware;
031    import org.apache.activemq.broker.SslContext;
032    import org.apache.activemq.util.FactoryFinder;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.util.IntrospectionSupport;
035    import org.apache.activemq.util.URISupport;
036    import org.apache.activemq.wireformat.WireFormat;
037    import org.apache.activemq.wireformat.WireFormatFactory;
038    
039    public abstract class TransportFactory {
040    
041        private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
042        private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
043        private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
044    
045        private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
046        private static final String THREAD_NAME_FILTER = "threadName";
047    
048        public abstract TransportServer doBind(URI location) throws IOException;
049    
050        public Transport doConnect(URI location, Executor ex) throws Exception {
051            return doConnect(location);
052        }
053    
054        public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
055            return doCompositeConnect(location);
056        }
057    
058        /**
059         * Creates a normal transport.
060         *
061         * @param location
062         * @return the transport
063         * @throws Exception
064         */
065        public static Transport connect(URI location) throws Exception {
066            TransportFactory tf = findTransportFactory(location);
067            return tf.doConnect(location);
068        }
069    
070        /**
071         * Creates a normal transport.
072         *
073         * @param location
074         * @param ex
075         * @return the transport
076         * @throws Exception
077         */
078        public static Transport connect(URI location, Executor ex) throws Exception {
079            TransportFactory tf = findTransportFactory(location);
080            return tf.doConnect(location, ex);
081        }
082    
083        /**
084         * Creates a slimmed down transport that is more efficient so that it can be
085         * used by composite transports like reliable and HA.
086         *
087         * @param location
088         * @return the Transport
089         * @throws Exception
090         */
091        public static Transport compositeConnect(URI location) throws Exception {
092            TransportFactory tf = findTransportFactory(location);
093            return tf.doCompositeConnect(location);
094        }
095    
096        /**
097         * Creates a slimmed down transport that is more efficient so that it can be
098         * used by composite transports like reliable and HA.
099         *
100         * @param location
101         * @param ex
102         * @return the Transport
103         * @throws Exception
104         */
105        public static Transport compositeConnect(URI location, Executor ex) throws Exception {
106            TransportFactory tf = findTransportFactory(location);
107            return tf.doCompositeConnect(location, ex);
108        }
109    
110        public static TransportServer bind(URI location) throws IOException {
111            TransportFactory tf = findTransportFactory(location);
112            return tf.doBind(location);
113        }
114    
115        public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
116            TransportFactory tf = findTransportFactory(location);
117            if( brokerService!=null && tf instanceof BrokerServiceAware ) {
118                ((BrokerServiceAware)tf).setBrokerService(brokerService);
119            }
120            try {
121                if( brokerService!=null ) {
122                    SslContext.setCurrentSslContext(brokerService.getSslContext());
123                }
124                return tf.doBind(location);
125            } finally {
126                SslContext.setCurrentSslContext(null);
127            }
128        }
129    
130        public Transport doConnect(URI location) throws Exception {
131            try {
132                Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
133                WireFormat wf = createWireFormat(options);
134                Transport transport = createTransport(location, wf);
135                Transport rc = configure(transport, wf, options);
136                if (!options.isEmpty()) {
137                    throw new IllegalArgumentException("Invalid connect parameters: " + options);
138                }
139                return rc;
140            } catch (URISyntaxException e) {
141                throw IOExceptionSupport.create(e);
142            }
143        }
144    
145        public Transport doCompositeConnect(URI location) throws Exception {
146            try {
147                Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
148                WireFormat wf = createWireFormat(options);
149                Transport transport = createTransport(location, wf);
150                Transport rc = compositeConfigure(transport, wf, options);
151                if (!options.isEmpty()) {
152                    throw new IllegalArgumentException("Invalid connect parameters: " + options);
153                }
154                return rc;
155            } catch (URISyntaxException e) {
156                throw IOExceptionSupport.create(e);
157            }
158        }
159    
160         /**
161          * Allow registration of a transport factory without wiring via META-INF classes
162         * @param scheme
163         * @param tf
164         */
165        public static void registerTransportFactory(String scheme, TransportFactory tf) {
166            TRANSPORT_FACTORYS.put(scheme, tf);
167          }
168    
169        /**
170         * Factory method to create a new transport
171         *
172         * @throws IOException
173         * @throws UnknownHostException
174         */
175        protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
176            throw new IOException("createTransport() method not implemented!");
177        }
178    
179        /**
180         * @param location
181         * @return
182         * @throws IOException
183         */
184        private static TransportFactory findTransportFactory(URI location) throws IOException {
185            String scheme = location.getScheme();
186            if (scheme == null) {
187                throw new IOException("Transport not scheme specified: [" + location + "]");
188            }
189            TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
190            if (tf == null) {
191                // Try to load if from a META-INF property.
192                try {
193                    tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
194                    TRANSPORT_FACTORYS.put(scheme, tf);
195                } catch (Throwable e) {
196                    throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
197                }
198            }
199            return tf;
200        }
201    
202        protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
203            WireFormatFactory factory = createWireFormatFactory(options);
204            WireFormat format = factory.createWireFormat();
205            return format;
206        }
207    
208        protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
209            String wireFormat = (String)options.remove("wireFormat");
210            if (wireFormat == null) {
211                wireFormat = getDefaultWireFormatType();
212            }
213    
214            try {
215                WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
216                IntrospectionSupport.setProperties(wff, options, "wireFormat.");
217                return wff;
218            } catch (Throwable e) {
219                throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
220            }
221        }
222    
223        protected String getDefaultWireFormatType() {
224            return "default";
225        }
226    
227        /**
228         * Fully configures and adds all need transport filters so that the
229         * transport can be used by the JMS client.
230         *
231         * @param transport
232         * @param wf
233         * @param options
234         * @return
235         * @throws Exception
236         */
237        @SuppressWarnings("rawtypes")
238        public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
239            transport = compositeConfigure(transport, wf, options);
240    
241            transport = new MutexTransport(transport);
242            transport = new ResponseCorrelator(transport);
243    
244            return transport;
245        }
246    
247        /**
248         * Fully configures and adds all need transport filters so that the
249         * transport can be used by the ActiveMQ message broker. The main difference
250         * between this and the configure() method is that the broker does not issue
251         * requests to the client so the ResponseCorrelator is not needed.
252         *
253         * @param transport
254         * @param format
255         * @param options
256         * @return
257         * @throws Exception
258         */
259        @SuppressWarnings("rawtypes")
260        public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
261            if (options.containsKey(THREAD_NAME_FILTER)) {
262                transport = new ThreadNameFilter(transport);
263            }
264            transport = compositeConfigure(transport, format, options);
265            transport = new MutexTransport(transport);
266            return transport;
267        }
268    
269        /**
270         * Similar to configure(...) but this avoid adding in the MutexTransport and
271         * ResponseCorrelator transport layers so that the resulting transport can
272         * more efficiently be used as part of a composite transport.
273         *
274         * @param transport
275         * @param format
276         * @param options
277         * @return
278         */
279        @SuppressWarnings("rawtypes")
280        public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
281            if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
282                transport = new WriteTimeoutFilter(transport);
283                String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
284                if (soWriteTimeout!=null) {
285                    ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
286                }
287            }
288            IntrospectionSupport.setProperties(transport, options);
289            return transport;
290        }
291    
292        @SuppressWarnings("rawtypes")
293        protected String getOption(Map options, String key, String def) {
294            String rc = (String) options.remove(key);
295            if( rc == null ) {
296                rc = def;
297            }
298            return rc;
299        }
300    }