001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.net.MalformedURLException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.net.UnknownHostException;
024 import java.util.HashMap;
025 import java.util.Map;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.Executor;
028
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.BrokerServiceAware;
031 import org.apache.activemq.broker.SslContext;
032 import org.apache.activemq.util.FactoryFinder;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.util.IntrospectionSupport;
035 import org.apache.activemq.util.URISupport;
036 import org.apache.activemq.wireformat.WireFormat;
037 import org.apache.activemq.wireformat.WireFormatFactory;
038
039 public abstract class TransportFactory {
040
041 private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
042 private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
043 private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
044
045 private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
046 private static final String THREAD_NAME_FILTER = "threadName";
047
048 public abstract TransportServer doBind(URI location) throws IOException;
049
050 public Transport doConnect(URI location, Executor ex) throws Exception {
051 return doConnect(location);
052 }
053
054 public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
055 return doCompositeConnect(location);
056 }
057
058 /**
059 * Creates a normal transport.
060 *
061 * @param location
062 * @return the transport
063 * @throws Exception
064 */
065 public static Transport connect(URI location) throws Exception {
066 TransportFactory tf = findTransportFactory(location);
067 return tf.doConnect(location);
068 }
069
070 /**
071 * Creates a normal transport.
072 *
073 * @param location
074 * @param ex
075 * @return the transport
076 * @throws Exception
077 */
078 public static Transport connect(URI location, Executor ex) throws Exception {
079 TransportFactory tf = findTransportFactory(location);
080 return tf.doConnect(location, ex);
081 }
082
083 /**
084 * Creates a slimmed down transport that is more efficient so that it can be
085 * used by composite transports like reliable and HA.
086 *
087 * @param location
088 * @return the Transport
089 * @throws Exception
090 */
091 public static Transport compositeConnect(URI location) throws Exception {
092 TransportFactory tf = findTransportFactory(location);
093 return tf.doCompositeConnect(location);
094 }
095
096 /**
097 * Creates a slimmed down transport that is more efficient so that it can be
098 * used by composite transports like reliable and HA.
099 *
100 * @param location
101 * @param ex
102 * @return the Transport
103 * @throws Exception
104 */
105 public static Transport compositeConnect(URI location, Executor ex) throws Exception {
106 TransportFactory tf = findTransportFactory(location);
107 return tf.doCompositeConnect(location, ex);
108 }
109
110 public static TransportServer bind(URI location) throws IOException {
111 TransportFactory tf = findTransportFactory(location);
112 return tf.doBind(location);
113 }
114
115 public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
116 TransportFactory tf = findTransportFactory(location);
117 if( brokerService!=null && tf instanceof BrokerServiceAware ) {
118 ((BrokerServiceAware)tf).setBrokerService(brokerService);
119 }
120 try {
121 if( brokerService!=null ) {
122 SslContext.setCurrentSslContext(brokerService.getSslContext());
123 }
124 return tf.doBind(location);
125 } finally {
126 SslContext.setCurrentSslContext(null);
127 }
128 }
129
130 public Transport doConnect(URI location) throws Exception {
131 try {
132 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
133 WireFormat wf = createWireFormat(options);
134 Transport transport = createTransport(location, wf);
135 Transport rc = configure(transport, wf, options);
136 if (!options.isEmpty()) {
137 throw new IllegalArgumentException("Invalid connect parameters: " + options);
138 }
139 return rc;
140 } catch (URISyntaxException e) {
141 throw IOExceptionSupport.create(e);
142 }
143 }
144
145 public Transport doCompositeConnect(URI location) throws Exception {
146 try {
147 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
148 WireFormat wf = createWireFormat(options);
149 Transport transport = createTransport(location, wf);
150 Transport rc = compositeConfigure(transport, wf, options);
151 if (!options.isEmpty()) {
152 throw new IllegalArgumentException("Invalid connect parameters: " + options);
153 }
154 return rc;
155 } catch (URISyntaxException e) {
156 throw IOExceptionSupport.create(e);
157 }
158 }
159
160 /**
161 * Allow registration of a transport factory without wiring via META-INF classes
162 * @param scheme
163 * @param tf
164 */
165 public static void registerTransportFactory(String scheme, TransportFactory tf) {
166 TRANSPORT_FACTORYS.put(scheme, tf);
167 }
168
169 /**
170 * Factory method to create a new transport
171 *
172 * @throws IOException
173 * @throws UnknownHostException
174 */
175 protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
176 throw new IOException("createTransport() method not implemented!");
177 }
178
179 /**
180 * @param location
181 * @return
182 * @throws IOException
183 */
184 private static TransportFactory findTransportFactory(URI location) throws IOException {
185 String scheme = location.getScheme();
186 if (scheme == null) {
187 throw new IOException("Transport not scheme specified: [" + location + "]");
188 }
189 TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
190 if (tf == null) {
191 // Try to load if from a META-INF property.
192 try {
193 tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
194 TRANSPORT_FACTORYS.put(scheme, tf);
195 } catch (Throwable e) {
196 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
197 }
198 }
199 return tf;
200 }
201
202 protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
203 WireFormatFactory factory = createWireFormatFactory(options);
204 WireFormat format = factory.createWireFormat();
205 return format;
206 }
207
208 protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
209 String wireFormat = (String)options.remove("wireFormat");
210 if (wireFormat == null) {
211 wireFormat = getDefaultWireFormatType();
212 }
213
214 try {
215 WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
216 IntrospectionSupport.setProperties(wff, options, "wireFormat.");
217 return wff;
218 } catch (Throwable e) {
219 throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
220 }
221 }
222
223 protected String getDefaultWireFormatType() {
224 return "default";
225 }
226
227 /**
228 * Fully configures and adds all need transport filters so that the
229 * transport can be used by the JMS client.
230 *
231 * @param transport
232 * @param wf
233 * @param options
234 * @return
235 * @throws Exception
236 */
237 @SuppressWarnings("rawtypes")
238 public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
239 transport = compositeConfigure(transport, wf, options);
240
241 transport = new MutexTransport(transport);
242 transport = new ResponseCorrelator(transport);
243
244 return transport;
245 }
246
247 /**
248 * Fully configures and adds all need transport filters so that the
249 * transport can be used by the ActiveMQ message broker. The main difference
250 * between this and the configure() method is that the broker does not issue
251 * requests to the client so the ResponseCorrelator is not needed.
252 *
253 * @param transport
254 * @param format
255 * @param options
256 * @return
257 * @throws Exception
258 */
259 @SuppressWarnings("rawtypes")
260 public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
261 if (options.containsKey(THREAD_NAME_FILTER)) {
262 transport = new ThreadNameFilter(transport);
263 }
264 transport = compositeConfigure(transport, format, options);
265 transport = new MutexTransport(transport);
266 return transport;
267 }
268
269 /**
270 * Similar to configure(...) but this avoid adding in the MutexTransport and
271 * ResponseCorrelator transport layers so that the resulting transport can
272 * more efficiently be used as part of a composite transport.
273 *
274 * @param transport
275 * @param format
276 * @param options
277 * @return
278 */
279 @SuppressWarnings("rawtypes")
280 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
281 if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
282 transport = new WriteTimeoutFilter(transport);
283 String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
284 if (soWriteTimeout!=null) {
285 ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
286 }
287 }
288 IntrospectionSupport.setProperties(transport, options);
289 return transport;
290 }
291
292 @SuppressWarnings("rawtypes")
293 protected String getOption(Map options, String key, String def) {
294 String rc = (String) options.remove(key);
295 if( rc == null ) {
296 rc = def;
297 }
298 return rc;
299 }
300 }