001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.net.UnknownHostException;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import javax.net.ServerSocketFactory;
027 import javax.net.SocketFactory;
028
029 import org.apache.activemq.TransportLoggerSupport;
030 import org.apache.activemq.openwire.OpenWireFormat;
031 import org.apache.activemq.transport.*;
032 import org.apache.activemq.util.IOExceptionSupport;
033 import org.apache.activemq.util.IntrospectionSupport;
034 import org.apache.activemq.util.URISupport;
035 import org.apache.activemq.wireformat.WireFormat;
036 import org.slf4j.Logger;
037 import org.slf4j.LoggerFactory;
038
039 /**
040 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
041 *
042 */
043 public class TcpTransportFactory extends TransportFactory {
044 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportFactory.class);
045
046 public TransportServer doBind(final URI location) throws IOException {
047 try {
048 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
049
050 ServerSocketFactory serverSocketFactory = createServerSocketFactory();
051 TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
052 server.setWireFormatFactory(createWireFormatFactory(options));
053 IntrospectionSupport.setProperties(server, options);
054 Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
055 server.setTransportOption(transportOptions);
056 server.bind();
057
058 return server;
059 } catch (URISyntaxException e) {
060 throw IOExceptionSupport.create(e);
061 }
062 }
063
064 /**
065 * Allows subclasses of TcpTransportFactory to create custom instances of
066 * TcpTransportServer.
067 *
068 * @param location
069 * @param serverSocketFactory
070 * @return
071 * @throws IOException
072 * @throws URISyntaxException
073 */
074 protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
075 return new TcpTransportServer(this, location, serverSocketFactory);
076 }
077
078 @SuppressWarnings("rawtypes")
079 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
080
081 TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
082 IntrospectionSupport.setProperties(tcpTransport, options);
083
084 Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
085 tcpTransport.setSocketOptions(socketOptions);
086
087 if (tcpTransport.isTrace()) {
088 try {
089 transport = TransportLoggerSupport.createTransportLogger(transport, tcpTransport.getLogWriterName(), tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
090 } catch (Throwable e) {
091 LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
092 }
093 }
094
095 boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
096 if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
097 transport = createInactivityMonitor(transport, format);
098 IntrospectionSupport.setProperties(transport, options);
099 }
100
101 // Only need the WireFormatNegotiator if using openwire
102 if (format instanceof OpenWireFormat) {
103 transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
104 }
105
106 return super.compositeConfigure(transport, format, options);
107 }
108
109
110 /**
111 * Returns true if the inactivity monitor should be used on the transport
112 */
113 protected boolean isUseInactivityMonitor(Transport transport) {
114 return true;
115 }
116
117 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
118 URI localLocation = null;
119 String path = location.getPath();
120 // see if the path is a local URI location
121 if (path != null && path.length() > 0) {
122 int localPortIndex = path.indexOf(':');
123 try {
124 Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
125 String localString = location.getScheme() + ":/" + path;
126 localLocation = new URI(localString);
127 } catch (Exception e) {
128 LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage());
129 if(LOG.isDebugEnabled()) {
130 LOG.debug("Failure detail", e);
131 }
132 }
133 }
134 SocketFactory socketFactory = createSocketFactory();
135 return createTcpTransport(wf, socketFactory, location, localLocation);
136 }
137
138 /**
139 * Allows subclasses of TcpTransportFactory to provide a create custom
140 * TcpTransport intances.
141 *
142 * @param location
143 * @param wf
144 * @param socketFactory
145 * @param localLocation
146 * @return
147 * @throws UnknownHostException
148 * @throws IOException
149 */
150 protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
151 return new TcpTransport(wf, socketFactory, location, localLocation);
152 }
153
154 protected ServerSocketFactory createServerSocketFactory() throws IOException {
155 return ServerSocketFactory.getDefault();
156 }
157
158 protected SocketFactory createSocketFactory() throws IOException {
159 return SocketFactory.getDefault();
160 }
161
162 protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
163 return new InactivityMonitor(transport, format);
164 }
165 }