001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import org.apache.activemq.util.FactoryFinder;
020    import org.apache.activemq.util.IOExceptionSupport;
021    import org.apache.activemq.util.IntrospectionSupport;
022    import org.apache.activemq.util.URISupport;
023    import org.apache.activemq.wireformat.WireFormat;
024    import org.apache.activemq.wireformat.WireFormatFactory;
025    
026    import java.io.IOException;
027    import java.net.MalformedURLException;
028    import java.net.URI;
029    import java.net.URISyntaxException;
030    import java.net.UnknownHostException;
031    import java.util.HashMap;
032    import java.util.Map;
033    import java.util.concurrent.ConcurrentHashMap;
034    import java.util.concurrent.Executor;
035    
036    public abstract class TransportFactory {
037    
038        private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
039        private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
040        private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
041    
042        private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
043        private static final String THREAD_NAME_FILTER = "threadName";
044    
045        public abstract TransportServer doBind(URI location) throws IOException;
046    
047        public Transport doConnect(URI location, Executor ex) throws Exception {
048            return doConnect(location);
049        }
050    
051        public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
052            return doCompositeConnect(location);
053        }
054    
055        /**
056         * Creates a normal transport.
057         *
058         * @param location
059         * @return the transport
060         * @throws Exception
061         */
062        public static Transport connect(URI location) throws Exception {
063            TransportFactory tf = findTransportFactory(location);
064            return tf.doConnect(location);
065        }
066    
067        /**
068         * Creates a normal transport.
069         *
070         * @param location
071         * @param ex
072         * @return the transport
073         * @throws Exception
074         */
075        public static Transport connect(URI location, Executor ex) throws Exception {
076            TransportFactory tf = findTransportFactory(location);
077            return tf.doConnect(location, ex);
078        }
079    
080        /**
081         * Creates a slimmed down transport that is more efficient so that it can be
082         * used by composite transports like reliable and HA.
083         *
084         * @param location
085         * @return the Transport
086         * @throws Exception
087         */
088        public static Transport compositeConnect(URI location) throws Exception {
089            TransportFactory tf = findTransportFactory(location);
090            return tf.doCompositeConnect(location);
091        }
092    
093        /**
094         * Creates a slimmed down transport that is more efficient so that it can be
095         * used by composite transports like reliable and HA.
096         *
097         * @param location
098         * @param ex
099         * @return the Transport
100         * @throws Exception
101         */
102        public static Transport compositeConnect(URI location, Executor ex) throws Exception {
103            TransportFactory tf = findTransportFactory(location);
104            return tf.doCompositeConnect(location, ex);
105        }
106    
107        public static TransportServer bind(URI location) throws IOException {
108            TransportFactory tf = findTransportFactory(location);
109            return tf.doBind(location);
110        }
111    
112        public Transport doConnect(URI location) throws Exception {
113            try {
114                Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
115                WireFormat wf = createWireFormat(options);
116                Transport transport = createTransport(location, wf);
117                Transport rc = configure(transport, wf, options);
118                if (!options.isEmpty()) {
119                    throw new IllegalArgumentException("Invalid connect parameters: " + options);
120                }
121                return rc;
122            } catch (URISyntaxException e) {
123                throw IOExceptionSupport.create(e);
124            }
125        }
126    
127        public Transport doCompositeConnect(URI location) throws Exception {
128            try {
129                Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
130                WireFormat wf = createWireFormat(options);
131                Transport transport = createTransport(location, wf);
132                Transport rc = compositeConfigure(transport, wf, options);
133                if (!options.isEmpty()) {
134                    throw new IllegalArgumentException("Invalid connect parameters: " + options);
135                }
136                return rc;
137            } catch (URISyntaxException e) {
138                throw IOExceptionSupport.create(e);
139            }
140        }
141    
142         /**
143          * Allow registration of a transport factory without wiring via META-INF classes
144         * @param scheme
145         * @param tf
146         */
147        public static void registerTransportFactory(String scheme, TransportFactory tf) {
148            TRANSPORT_FACTORYS.put(scheme, tf);
149          }
150    
151        /**
152         * Factory method to create a new transport
153         *
154         * @throws IOException
155         * @throws UnknownHostException
156         */
157        protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
158            throw new IOException("createTransport() method not implemented!");
159        }
160    
161        /**
162         * @param location
163         * @return
164         * @throws IOException
165         */
166        public static TransportFactory findTransportFactory(URI location) throws IOException {
167            String scheme = location.getScheme();
168            if (scheme == null) {
169                throw new IOException("Transport not scheme specified: [" + location + "]");
170            }
171            TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
172            if (tf == null) {
173                // Try to load if from a META-INF property.
174                try {
175                    tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
176                    TRANSPORT_FACTORYS.put(scheme, tf);
177                } catch (Throwable e) {
178                    throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
179                }
180            }
181            return tf;
182        }
183    
184        protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
185            WireFormatFactory factory = createWireFormatFactory(options);
186            WireFormat format = factory.createWireFormat();
187            return format;
188        }
189    
190        protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
191            String wireFormat = (String)options.remove("wireFormat");
192            if (wireFormat == null) {
193                wireFormat = getDefaultWireFormatType();
194            }
195    
196            try {
197                WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
198                IntrospectionSupport.setProperties(wff, options, "wireFormat.");
199                return wff;
200            } catch (Throwable e) {
201                throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
202            }
203        }
204    
205        protected String getDefaultWireFormatType() {
206            return "default";
207        }
208    
209        /**
210         * Fully configures and adds all need transport filters so that the
211         * transport can be used by the JMS client.
212         *
213         * @param transport
214         * @param wf
215         * @param options
216         * @return
217         * @throws Exception
218         */
219        @SuppressWarnings("rawtypes")
220        public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
221            transport = compositeConfigure(transport, wf, options);
222    
223            transport = new MutexTransport(transport);
224            transport = new ResponseCorrelator(transport);
225    
226            return transport;
227        }
228    
229        /**
230         * Fully configures and adds all need transport filters so that the
231         * transport can be used by the ActiveMQ message broker. The main difference
232         * between this and the configure() method is that the broker does not issue
233         * requests to the client so the ResponseCorrelator is not needed.
234         *
235         * @param transport
236         * @param format
237         * @param options
238         * @return
239         * @throws Exception
240         */
241        @SuppressWarnings("rawtypes")
242        public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
243            if (options.containsKey(THREAD_NAME_FILTER)) {
244                transport = new ThreadNameFilter(transport);
245            }
246            transport = compositeConfigure(transport, format, options);
247            transport = new MutexTransport(transport);
248            return transport;
249        }
250    
251        /**
252         * Similar to configure(...) but this avoid adding in the MutexTransport and
253         * ResponseCorrelator transport layers so that the resulting transport can
254         * more efficiently be used as part of a composite transport.
255         *
256         * @param transport
257         * @param format
258         * @param options
259         * @return
260         */
261        @SuppressWarnings("rawtypes")
262        public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
263            if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
264                transport = new WriteTimeoutFilter(transport);
265                String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
266                if (soWriteTimeout!=null) {
267                    ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
268                }
269            }
270            IntrospectionSupport.setProperties(transport, options);
271            return transport;
272        }
273    
274        @SuppressWarnings("rawtypes")
275        protected String getOption(Map options, String key, String def) {
276            String rc = (String) options.remove(key);
277            if( rc == null ) {
278                rc = def;
279            }
280            return rc;
281        }
282    }