001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.IOException;
020 import java.net.InetAddress;
021 import java.net.InetSocketAddress;
022 import java.net.ServerSocket;
023 import java.net.Socket;
024 import java.net.SocketException;
025 import java.net.SocketTimeoutException;
026 import java.net.URI;
027 import java.net.URISyntaxException;
028 import java.net.UnknownHostException;
029 import java.util.HashMap;
030 import java.util.concurrent.BlockingQueue;
031 import java.util.concurrent.LinkedBlockingQueue;
032 import java.util.concurrent.TimeUnit;
033
034 import javax.net.ServerSocketFactory;
035
036 import org.apache.activemq.Service;
037 import org.apache.activemq.ThreadPriorities;
038 import org.apache.activemq.TransportLoggerSupport;
039 import org.apache.activemq.command.BrokerInfo;
040 import org.apache.activemq.openwire.OpenWireFormatFactory;
041 import org.apache.activemq.transport.Transport;
042 import org.apache.activemq.transport.TransportServer;
043 import org.apache.activemq.transport.TransportServerThreadSupport;
044 import org.apache.activemq.util.IOExceptionSupport;
045 import org.apache.activemq.util.InetAddressUtil;
046 import org.apache.activemq.util.IntrospectionSupport;
047 import org.apache.activemq.util.ServiceListener;
048 import org.apache.activemq.util.ServiceStopper;
049 import org.apache.activemq.util.ServiceSupport;
050 import org.apache.activemq.wireformat.WireFormat;
051 import org.apache.activemq.wireformat.WireFormatFactory;
052 import org.slf4j.Logger;
053 import org.slf4j.LoggerFactory;
054
055 /**
056 * A TCP based implementation of {@link TransportServer}
057 *
058 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
059 *
060 */
061
062 public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
063
064 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
065 protected ServerSocket serverSocket;
066 protected int backlog = 5000;
067 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
068 protected final TcpTransportFactory transportFactory;
069 protected long maxInactivityDuration = 30000;
070 protected long maxInactivityDurationInitalDelay = 10000;
071 protected int minmumWireFormatVersion;
072 protected boolean useQueueForAccept=true;
073
074 /**
075 * trace=true -> the Transport stack where this TcpTransport
076 * object will be, will have a TransportLogger layer
077 * trace=false -> the Transport stack where this TcpTransport
078 * object will be, will NOT have a TransportLogger layer, and therefore
079 * will never be able to print logging messages.
080 * This parameter is most probably set in Connection or TransportConnector URIs.
081 */
082 protected boolean trace = false;
083
084 protected int soTimeout = 0;
085 protected int socketBufferSize = 64 * 1024;
086 protected int connectionTimeout = 30000;
087
088 /**
089 * Name of the LogWriter implementation to use.
090 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
091 * This parameter is most probably set in Connection or TransportConnector URIs.
092 */
093 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
094 /**
095 * Specifies if the TransportLogger will be manageable by JMX or not.
096 * Also, as long as there is at least 1 TransportLogger which is manageable,
097 * a TransportLoggerControl MBean will me created.
098 */
099 protected boolean dynamicManagement = false;
100 /**
101 * startLogging=true -> the TransportLogger object of the Transport stack
102 * will initially write messages to the log.
103 * startLogging=false -> the TransportLogger object of the Transport stack
104 * will initially NOT write messages to the log.
105 * This parameter only has an effect if trace == true.
106 * This parameter is most probably set in Connection or TransportConnector URIs.
107 */
108 protected boolean startLogging = true;
109 protected final ServerSocketFactory serverSocketFactory;
110 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
111 protected Thread socketHandlerThread;
112 /**
113 * The maximum number of sockets allowed for this server
114 */
115 protected int maximumConnections = Integer.MAX_VALUE;
116 protected int currentTransportCount=0;
117
118 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
119 super(location);
120 this.transportFactory = transportFactory;
121 this.serverSocketFactory = serverSocketFactory;
122 }
123
124 public void bind() throws IOException {
125 URI bind = getBindLocation();
126
127 String host = bind.getHost();
128 host = (host == null || host.length() == 0) ? "localhost" : host;
129 InetAddress addr = InetAddress.getByName(host);
130
131 try {
132 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
133 configureServerSocket(this.serverSocket);
134 } catch (IOException e) {
135 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
136 }
137 try {
138 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
139 .getFragment()));
140 } catch (URISyntaxException e) {
141
142 // it could be that the host name contains invalid characters such
143 // as _ on unix platforms
144 // so lets try use the IP address instead
145 try {
146 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
147 } catch (URISyntaxException e2) {
148 throw IOExceptionSupport.create(e2);
149 }
150 }
151 }
152
153 private void configureServerSocket(ServerSocket socket) throws SocketException {
154 socket.setSoTimeout(2000);
155 if (transportOptions != null) {
156 IntrospectionSupport.setProperties(socket, transportOptions);
157 }
158 }
159
160 /**
161 * @return Returns the wireFormatFactory.
162 */
163 public WireFormatFactory getWireFormatFactory() {
164 return wireFormatFactory;
165 }
166
167 /**
168 * @param wireFormatFactory The wireFormatFactory to set.
169 */
170 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
171 this.wireFormatFactory = wireFormatFactory;
172 }
173
174 /**
175 * Associates a broker info with the transport server so that the transport
176 * can do discovery advertisements of the broker.
177 *
178 * @param brokerInfo
179 */
180 public void setBrokerInfo(BrokerInfo brokerInfo) {
181 }
182
183 public long getMaxInactivityDuration() {
184 return maxInactivityDuration;
185 }
186
187 public void setMaxInactivityDuration(long maxInactivityDuration) {
188 this.maxInactivityDuration = maxInactivityDuration;
189 }
190
191 public long getMaxInactivityDurationInitalDelay() {
192 return this.maxInactivityDurationInitalDelay;
193 }
194
195 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
196 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
197 }
198
199 public int getMinmumWireFormatVersion() {
200 return minmumWireFormatVersion;
201 }
202
203 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
204 this.minmumWireFormatVersion = minmumWireFormatVersion;
205 }
206
207 public boolean isTrace() {
208 return trace;
209 }
210
211 public void setTrace(boolean trace) {
212 this.trace = trace;
213 }
214
215 public String getLogWriterName() {
216 return logWriterName;
217 }
218
219 public void setLogWriterName(String logFormat) {
220 this.logWriterName = logFormat;
221 }
222
223 public boolean isDynamicManagement() {
224 return dynamicManagement;
225 }
226
227 public void setDynamicManagement(boolean useJmx) {
228 this.dynamicManagement = useJmx;
229 }
230
231 public boolean isStartLogging() {
232 return startLogging;
233 }
234
235 public void setStartLogging(boolean startLogging) {
236 this.startLogging = startLogging;
237 }
238
239 /**
240 * @return the backlog
241 */
242 public int getBacklog() {
243 return backlog;
244 }
245
246 /**
247 * @param backlog the backlog to set
248 */
249 public void setBacklog(int backlog) {
250 this.backlog = backlog;
251 }
252
253 /**
254 * @return the useQueueForAccept
255 */
256 public boolean isUseQueueForAccept() {
257 return useQueueForAccept;
258 }
259
260 /**
261 * @param useQueueForAccept the useQueueForAccept to set
262 */
263 public void setUseQueueForAccept(boolean useQueueForAccept) {
264 this.useQueueForAccept = useQueueForAccept;
265 }
266
267 /**
268 * pull Sockets from the ServerSocket
269 */
270 public void run() {
271 while (!isStopped()) {
272 Socket socket = null;
273 try {
274 socket = serverSocket.accept();
275 if (socket != null) {
276 if (isStopped() || getAcceptListener() == null) {
277 socket.close();
278 } else {
279 if (useQueueForAccept) {
280 socketQueue.put(socket);
281 }else {
282 handleSocket(socket);
283 }
284 }
285 }
286 } catch (SocketTimeoutException ste) {
287 // expect this to happen
288 } catch (Exception e) {
289 if (!isStopping()) {
290 onAcceptError(e);
291 } else if (!isStopped()) {
292 LOG.warn("run()", e);
293 onAcceptError(e);
294 }
295 }
296 }
297 }
298
299 /**
300 * Allow derived classes to override the Transport implementation that this
301 * transport server creates.
302 *
303 * @param socket
304 * @param format
305 * @return
306 * @throws IOException
307 */
308 protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
309 return new TcpTransport(format, socket);
310 }
311
312 /**
313 * @return pretty print of this
314 */
315 public String toString() {
316 return "" + getBindLocation();
317 }
318
319 /**
320 * @param socket
321 * @param inetAddress
322 * @return real hostName
323 * @throws UnknownHostException
324 */
325 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
326 String result = null;
327 if (socket.isBound()) {
328 if (socket.getInetAddress().isAnyLocalAddress()) {
329 // make it more human readable and useful, an alternative to 0.0.0.0
330 result = InetAddressUtil.getLocalHostName();
331 } else {
332 result = socket.getInetAddress().getCanonicalHostName();
333 }
334 } else {
335 result = bindAddress.getCanonicalHostName();
336 }
337 return result;
338 }
339
340 protected void doStart() throws Exception {
341 if(useQueueForAccept) {
342 Runnable run = new Runnable() {
343 public void run() {
344 try {
345 while (!isStopped() && !isStopping()) {
346 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
347 if (sock != null) {
348 handleSocket(sock);
349 }
350 }
351
352 } catch (InterruptedException e) {
353 LOG.info("socketQueue interuppted - stopping");
354 if (!isStopping()) {
355 onAcceptError(e);
356 }
357 }
358
359 }
360
361 };
362 socketHandlerThread = new Thread(null, run,
363 "ActiveMQ Transport Server Thread Handler: " + toString(),
364 getStackSize());
365 socketHandlerThread.setDaemon(true);
366 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
367 socketHandlerThread.start();
368 }
369 super.doStart();
370
371 }
372
373 protected void doStop(ServiceStopper stopper) throws Exception {
374 super.doStop(stopper);
375 if (serverSocket != null) {
376 serverSocket.close();
377 }
378 }
379
380 public InetSocketAddress getSocketAddress() {
381 return (InetSocketAddress)serverSocket.getLocalSocketAddress();
382 }
383
384 protected final void handleSocket(Socket socket) {
385 try {
386 if (this.currentTransportCount >= this.maximumConnections) {
387 throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
388 "number of allowed client connections. See the 'maximumConnections' " +
389 "property on the TCP transport configuration URI in the ActiveMQ " +
390 "configuration file (e.g., activemq.xml)");
391
392 } else {
393 HashMap<String, Object> options = new HashMap<String, Object>();
394 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
395 options.put("maxInactivityDurationInitalDelay",
396 Long.valueOf(maxInactivityDurationInitalDelay));
397 options.put("minmumWireFormatVersion",
398 Integer.valueOf(minmumWireFormatVersion));
399 options.put("trace", Boolean.valueOf(trace));
400 options.put("soTimeout", Integer.valueOf(soTimeout));
401 options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
402 options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
403 options.put("logWriterName", logWriterName);
404 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
405 options.put("startLogging", Boolean.valueOf(startLogging));
406 options.putAll(transportOptions);
407
408 WireFormat format = wireFormatFactory.createWireFormat();
409 Transport transport = createTransport(socket, format);
410
411 if (transport instanceof ServiceSupport) {
412 ((ServiceSupport) transport).addServiceListener(this);
413 }
414
415 Transport configuredTransport =
416 transportFactory.serverConfigure( transport, format, options);
417
418 getAcceptListener().onAccept(configuredTransport);
419 }
420 } catch (SocketTimeoutException ste) {
421 // expect this to happen
422 } catch (Exception e) {
423 if (!isStopping()) {
424 onAcceptError(e);
425 } else if (!isStopped()) {
426 LOG.warn("run()", e);
427 onAcceptError(e);
428 }
429 }
430
431 }
432
433 public int getSoTimeout() {
434 return soTimeout;
435 }
436
437 public void setSoTimeout(int soTimeout) {
438 this.soTimeout = soTimeout;
439 }
440
441 public int getSocketBufferSize() {
442 return socketBufferSize;
443 }
444
445 public void setSocketBufferSize(int socketBufferSize) {
446 this.socketBufferSize = socketBufferSize;
447 }
448
449 public int getConnectionTimeout() {
450 return connectionTimeout;
451 }
452
453 public void setConnectionTimeout(int connectionTimeout) {
454 this.connectionTimeout = connectionTimeout;
455 }
456
457 /**
458 * @return the maximumConnections
459 */
460 public int getMaximumConnections() {
461 return maximumConnections;
462 }
463
464 /**
465 * @param maximumConnections the maximumConnections to set
466 */
467 public void setMaximumConnections(int maximumConnections) {
468 this.maximumConnections = maximumConnections;
469 }
470
471 public void started(Service service) {
472 this.currentTransportCount++;
473 }
474
475 public void stopped(Service service) {
476 this.currentTransportCount--;
477 }
478
479 @Override
480 public boolean isSslServer() {
481 return false;
482 }
483 }