001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.ClosedChannelException; 030import java.nio.channels.SelectionKey; 031import java.nio.channels.Selector; 032import java.nio.channels.ServerSocketChannel; 033import java.nio.channels.SocketChannel; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import javax.net.ServerSocketFactory; 043import javax.net.ssl.SSLParameters; 044import javax.net.ssl.SSLServerSocket; 045 046import org.apache.activemq.Service; 047import org.apache.activemq.ThreadPriorities; 048import org.apache.activemq.TransportLoggerSupport; 049import org.apache.activemq.command.BrokerInfo; 050import org.apache.activemq.openwire.OpenWireFormatFactory; 051import org.apache.activemq.transport.Transport; 052import org.apache.activemq.transport.TransportFactory; 053import org.apache.activemq.transport.TransportServer; 054import org.apache.activemq.transport.TransportServerThreadSupport; 055import org.apache.activemq.util.IOExceptionSupport; 056import org.apache.activemq.util.InetAddressUtil; 057import org.apache.activemq.util.IntrospectionSupport; 058import org.apache.activemq.util.ServiceListener; 059import org.apache.activemq.util.ServiceStopper; 060import org.apache.activemq.util.ServiceSupport; 061import org.apache.activemq.wireformat.WireFormat; 062import org.apache.activemq.wireformat.WireFormatFactory; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * A TCP based implementation of {@link TransportServer} 068 */ 069public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 072 073 protected volatile ServerSocket serverSocket; 074 protected volatile Selector selector; 075 protected int backlog = 5000; 076 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 077 protected final TcpTransportFactory transportFactory; 078 protected long maxInactivityDuration = 30000; 079 protected long maxInactivityDurationInitalDelay = 10000; 080 protected int minmumWireFormatVersion; 081 protected boolean useQueueForAccept = true; 082 protected boolean allowLinkStealing; 083 protected boolean verifyHostName = false; 084 085 /** 086 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 087 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 088 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 089 * TransportConnector URIs. 090 */ 091 protected boolean trace = false; 092 093 protected int soTimeout = 0; 094 protected int socketBufferSize = 64 * 1024; 095 protected int connectionTimeout = 30000; 096 097 /** 098 * Name of the LogWriter implementation to use. Names are mapped to classes in the 099 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 100 * set in Connection or TransportConnector URIs. 101 */ 102 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 103 104 /** 105 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 106 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 107 */ 108 protected boolean dynamicManagement = false; 109 110 /** 111 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 112 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 113 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 114 * TransportConnector URIs. 115 */ 116 protected boolean startLogging = true; 117 protected int jmxPort = TransportLoggerSupport.defaultJmxPort; 118 protected final ServerSocketFactory serverSocketFactory; 119 protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 120 protected Thread socketHandlerThread; 121 122 /** 123 * The maximum number of sockets allowed for this server 124 */ 125 protected int maximumConnections = Integer.MAX_VALUE; 126 protected final AtomicInteger currentTransportCount = new AtomicInteger(); 127 128 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 129 URISyntaxException { 130 super(location); 131 this.transportFactory = transportFactory; 132 this.serverSocketFactory = serverSocketFactory; 133 } 134 135 public void bind() throws IOException { 136 URI bind = getBindLocation(); 137 138 String host = bind.getHost(); 139 host = (host == null || host.length() == 0) ? "localhost" : host; 140 InetAddress addr = InetAddress.getByName(host); 141 142 try { 143 serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 144 configureServerSocket(serverSocket); 145 } catch (IOException e) { 146 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 147 } 148 try { 149 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 150 bind.getQuery(), bind.getFragment())); 151 } catch (URISyntaxException e) { 152 // it could be that the host name contains invalid characters such 153 // as _ on unix platforms so lets try use the IP address instead 154 try { 155 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 156 bind.getQuery(), bind.getFragment())); 157 } catch (URISyntaxException e2) { 158 throw IOExceptionSupport.create(e2); 159 } 160 } 161 } 162 163 private void configureServerSocket(ServerSocket socket) throws SocketException { 164 socket.setSoTimeout(2000); 165 if (transportOptions != null) { 166 167 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 168 // to SSLServerSocket to configure it has a side effect on the socket rendering it 169 // useless as all suites are enabled many of which are considered as insecure. We 170 // instead trap that option here and throw an exception. We should really consider 171 // all invalid options as breaking and not start the transport but the current design 172 // doesn't really allow for this. 173 // 174 // see: https://issues.apache.org/jira/browse/AMQ-4582 175 // 176 if (socket instanceof SSLServerSocket) { 177 if (transportOptions.containsKey("verifyHostName")) { 178 verifyHostName = Boolean.parseBoolean(transportOptions.get("verifyHostName").toString()); 179 } else { 180 transportOptions.put("verifyHostName", verifyHostName); 181 } 182 183 if (verifyHostName) { 184 SSLParameters sslParams = new SSLParameters(); 185 sslParams.setEndpointIdentificationAlgorithm("HTTPS"); 186 ((SSLServerSocket)this.serverSocket).setSSLParameters(sslParams); 187 } 188 189 if (transportOptions.containsKey("enabledCipherSuites")) { 190 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 191 192 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 193 throw new SocketException(String.format( 194 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 195 } 196 } 197 198 } 199 200 //AMQ-6599 - don't strip out set properties on the socket as we need to set them 201 //on the Transport as well later 202 IntrospectionSupport.setProperties(socket, transportOptions, false); 203 } 204 } 205 206 /** 207 * @return Returns the wireFormatFactory. 208 */ 209 public WireFormatFactory getWireFormatFactory() { 210 return wireFormatFactory; 211 } 212 213 /** 214 * @param wireFormatFactory 215 * The wireFormatFactory to set. 216 */ 217 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 218 this.wireFormatFactory = wireFormatFactory; 219 } 220 221 /** 222 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 223 * broker. 224 * 225 * @param brokerInfo 226 */ 227 @Override 228 public void setBrokerInfo(BrokerInfo brokerInfo) { 229 } 230 231 public long getMaxInactivityDuration() { 232 return maxInactivityDuration; 233 } 234 235 public void setMaxInactivityDuration(long maxInactivityDuration) { 236 this.maxInactivityDuration = maxInactivityDuration; 237 } 238 239 public long getMaxInactivityDurationInitalDelay() { 240 return this.maxInactivityDurationInitalDelay; 241 } 242 243 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 244 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 245 } 246 247 public int getMinmumWireFormatVersion() { 248 return minmumWireFormatVersion; 249 } 250 251 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 252 this.minmumWireFormatVersion = minmumWireFormatVersion; 253 } 254 255 public boolean isTrace() { 256 return trace; 257 } 258 259 public void setTrace(boolean trace) { 260 this.trace = trace; 261 } 262 263 public String getLogWriterName() { 264 return logWriterName; 265 } 266 267 public void setLogWriterName(String logFormat) { 268 this.logWriterName = logFormat; 269 } 270 271 public boolean isDynamicManagement() { 272 return dynamicManagement; 273 } 274 275 public void setDynamicManagement(boolean useJmx) { 276 this.dynamicManagement = useJmx; 277 } 278 279 public void setJmxPort(int jmxPort) { 280 this.jmxPort = jmxPort; 281 } 282 283 public int getJmxPort() { 284 return jmxPort; 285 } 286 287 public boolean isStartLogging() { 288 return startLogging; 289 } 290 291 public void setStartLogging(boolean startLogging) { 292 this.startLogging = startLogging; 293 } 294 295 /** 296 * @return the backlog 297 */ 298 public int getBacklog() { 299 return backlog; 300 } 301 302 /** 303 * @param backlog 304 * the backlog to set 305 */ 306 public void setBacklog(int backlog) { 307 this.backlog = backlog; 308 } 309 310 /** 311 * @return the useQueueForAccept 312 */ 313 public boolean isUseQueueForAccept() { 314 return useQueueForAccept; 315 } 316 317 /** 318 * @param useQueueForAccept 319 * the useQueueForAccept to set 320 */ 321 public void setUseQueueForAccept(boolean useQueueForAccept) { 322 this.useQueueForAccept = useQueueForAccept; 323 } 324 325 /** 326 * pull Sockets from the ServerSocket 327 */ 328 @Override 329 public void run() { 330 if (!isStopped() && !isStopping()) { 331 final ServerSocket serverSocket = this.serverSocket; 332 if (serverSocket == null) { 333 onAcceptError(new IOException("Server started without a valid ServerSocket")); 334 } 335 336 final ServerSocketChannel channel = serverSocket.getChannel(); 337 if (channel != null) { 338 doRunWithServerSocketChannel(channel); 339 } else { 340 doRunWithServerSocket(serverSocket); 341 } 342 } 343 } 344 345 private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { 346 try { 347 channel.configureBlocking(false); 348 final Selector selector = Selector.open(); 349 350 try { 351 channel.register(selector, SelectionKey.OP_ACCEPT); 352 } catch (ClosedChannelException ex) { 353 try { 354 selector.close(); 355 } catch (IOException ignore) {} 356 357 throw ex; 358 } 359 360 // Update object instance for later cleanup. 361 this.selector = selector; 362 363 while (!isStopped()) { 364 int count = selector.select(10); 365 366 if (count == 0) { 367 continue; 368 } 369 370 Set<SelectionKey> keys = selector.selectedKeys(); 371 372 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { 373 final SelectionKey key = i.next(); 374 if (key.isAcceptable()) { 375 try { 376 SocketChannel sc = channel.accept(); 377 if (sc != null) { 378 if (isStopped() || getAcceptListener() == null) { 379 sc.close(); 380 } else { 381 if (useQueueForAccept) { 382 socketQueue.put(sc.socket()); 383 } else { 384 handleSocket(sc.socket()); 385 } 386 } 387 } 388 389 } catch (SocketTimeoutException ste) { 390 // expect this to happen 391 } catch (Exception e) { 392 e.printStackTrace(); 393 if (!isStopping()) { 394 onAcceptError(e); 395 } else if (!isStopped()) { 396 LOG.warn("run()", e); 397 onAcceptError(e); 398 } 399 } 400 } 401 i.remove(); 402 } 403 } 404 } catch (IOException ex) { 405 if (!isStopping()) { 406 onAcceptError(ex); 407 } else if (!isStopped()) { 408 LOG.warn("run()", ex); 409 onAcceptError(ex); 410 } 411 } 412 } 413 414 private void doRunWithServerSocket(final ServerSocket serverSocket) { 415 while (!isStopped()) { 416 Socket socket = null; 417 try { 418 socket = serverSocket.accept(); 419 if (socket != null) { 420 if (isStopped() || getAcceptListener() == null) { 421 socket.close(); 422 } else { 423 if (useQueueForAccept) { 424 socketQueue.put(socket); 425 } else { 426 handleSocket(socket); 427 } 428 } 429 } 430 } catch (SocketTimeoutException ste) { 431 // expect this to happen 432 } catch (Exception e) { 433 if (!isStopping()) { 434 onAcceptError(e); 435 } else if (!isStopped()) { 436 LOG.warn("run()", e); 437 onAcceptError(e); 438 } 439 } 440 } 441 } 442 443 /** 444 * Allow derived classes to override the Transport implementation that this transport server creates. 445 * 446 * @param socket 447 * @param format 448 * 449 * @return a new Transport instance. 450 * 451 * @throws IOException 452 */ 453 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 454 return new TcpTransport(format, socket); 455 } 456 457 /** 458 * @return pretty print of this 459 */ 460 @Override 461 public String toString() { 462 return "" + getBindLocation(); 463 } 464 465 /** 466 * @param socket 467 * @param bindAddress 468 * @return real hostName 469 * @throws UnknownHostException 470 */ 471 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 472 String result = null; 473 if (socket.isBound()) { 474 if (socket.getInetAddress().isAnyLocalAddress()) { 475 // make it more human readable and useful, an alternative to 0.0.0.0 476 result = InetAddressUtil.getLocalHostName(); 477 } else { 478 result = socket.getInetAddress().getCanonicalHostName(); 479 } 480 } else { 481 result = bindAddress.getCanonicalHostName(); 482 } 483 return result; 484 } 485 486 @Override 487 protected void doStart() throws Exception { 488 if (useQueueForAccept) { 489 Runnable run = new Runnable() { 490 @Override 491 public void run() { 492 try { 493 while (!isStopped() && !isStopping()) { 494 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 495 if (sock != null) { 496 try { 497 handleSocket(sock); 498 } catch (Throwable thrown) { 499 if (!isStopping()) { 500 onAcceptError(new Exception(thrown)); 501 } else if (!isStopped()) { 502 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 503 onAcceptError(new Exception(thrown)); 504 } 505 } 506 } 507 } 508 509 } catch (InterruptedException e) { 510 if (!isStopped() || !isStopping()) { 511 LOG.info("socketQueue interrupted - stopping"); 512 onAcceptError(e); 513 } 514 } 515 } 516 }; 517 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 518 socketHandlerThread.setDaemon(true); 519 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 520 socketHandlerThread.start(); 521 } 522 super.doStart(); 523 } 524 525 @Override 526 protected void doStop(ServiceStopper stopper) throws Exception { 527 Exception firstFailure = null; 528 529 try { 530 if (selector != null) { 531 selector.close(); 532 selector = null; 533 } 534 } catch (Exception error) { 535 } 536 537 try { 538 final ServerSocket serverSocket = this.serverSocket; 539 if (serverSocket != null) { 540 this.serverSocket = null; 541 serverSocket.close(); 542 } 543 } catch (Exception error) { 544 firstFailure = error; 545 } 546 547 if (socketHandlerThread != null) { 548 socketHandlerThread.interrupt(); 549 socketHandlerThread = null; 550 } 551 552 try { 553 super.doStop(stopper); 554 } catch (Exception error) { 555 if (firstFailure != null) { 556 firstFailure = error; 557 } 558 } 559 560 if (firstFailure != null) { 561 throw firstFailure; 562 } 563 } 564 565 @Override 566 public InetSocketAddress getSocketAddress() { 567 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 568 } 569 570 protected void handleSocket(Socket socket) { 571 doHandleSocket(socket); 572 } 573 574 final protected void doHandleSocket(Socket socket) { 575 boolean closeSocket = true; 576 boolean countIncremented = false; 577 try { 578 int currentCount; 579 do { 580 currentCount = currentTransportCount.get(); 581 if (currentCount >= this.maximumConnections) { 582 throw new ExceededMaximumConnectionsException( 583 "Exceeded the maximum number of allowed client connections. See the '" + 584 "maximumConnections' property on the TCP transport configuration URI " + 585 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 586 } 587 588 //Increment this value before configuring the transport 589 //This is necessary because some of the transport servers must read from the 590 //socket during configureTransport() so we want to make sure this value is 591 //accurate as the transport server could pause here waiting for data to be sent from a client 592 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 593 countIncremented = true; 594 595 HashMap<String, Object> options = new HashMap<String, Object>(); 596 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 597 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 598 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 599 options.put("trace", Boolean.valueOf(trace)); 600 options.put("soTimeout", Integer.valueOf(soTimeout)); 601 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 602 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 603 options.put("logWriterName", logWriterName); 604 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 605 options.put("startLogging", Boolean.valueOf(startLogging)); 606 options.put("jmxPort", Integer.valueOf(jmxPort)); 607 options.putAll(transportOptions); 608 609 TransportInfo transportInfo = configureTransport(this, socket); 610 closeSocket = false; 611 612 if (transportInfo.transport instanceof ServiceSupport) { 613 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 614 } 615 616 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 617 transportInfo.transport, transportInfo.format, options); 618 619 getAcceptListener().onAccept(configuredTransport); 620 621 } catch (SocketTimeoutException ste) { 622 // expect this to happen 623 } catch (Exception e) { 624 if (closeSocket) { 625 try { 626 //if closing the socket, only decrement the count it was actually incremented 627 //where it was incremented 628 if (countIncremented) { 629 currentTransportCount.decrementAndGet(); 630 } 631 socket.close(); 632 } catch (Exception ignore) { 633 } 634 } 635 636 if (!isStopping()) { 637 onAcceptError(e); 638 } else if (!isStopped()) { 639 LOG.warn("run()", e); 640 onAcceptError(e); 641 } 642 } 643 } 644 645 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 646 WireFormat format = wireFormatFactory.createWireFormat(); 647 Transport transport = createTransport(socket, format); 648 return new TransportInfo(format, transport, transportFactory); 649 } 650 651 protected class TransportInfo { 652 final WireFormat format; 653 final Transport transport; 654 final TransportFactory transportFactory; 655 656 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 657 this.format = format; 658 this.transport = transport; 659 this.transportFactory = transportFactory; 660 } 661 } 662 663 public int getSoTimeout() { 664 return soTimeout; 665 } 666 667 public void setSoTimeout(int soTimeout) { 668 this.soTimeout = soTimeout; 669 } 670 671 public int getSocketBufferSize() { 672 return socketBufferSize; 673 } 674 675 public void setSocketBufferSize(int socketBufferSize) { 676 this.socketBufferSize = socketBufferSize; 677 } 678 679 public int getConnectionTimeout() { 680 return connectionTimeout; 681 } 682 683 public void setConnectionTimeout(int connectionTimeout) { 684 this.connectionTimeout = connectionTimeout; 685 } 686 687 /** 688 * @return the maximumConnections 689 */ 690 public int getMaximumConnections() { 691 return maximumConnections; 692 } 693 694 /** 695 * @param maximumConnections 696 * the maximumConnections to set 697 */ 698 public void setMaximumConnections(int maximumConnections) { 699 this.maximumConnections = maximumConnections; 700 } 701 702 public AtomicInteger getCurrentTransportCount() { 703 return currentTransportCount; 704 } 705 706 @Override 707 public void started(Service service) { 708 } 709 710 @Override 711 public void stopped(Service service) { 712 this.currentTransportCount.decrementAndGet(); 713 } 714 715 @Override 716 public boolean isSslServer() { 717 return false; 718 } 719 720 @Override 721 public boolean isAllowLinkStealing() { 722 return allowLinkStealing; 723 } 724 725 @Override 726 public void setAllowLinkStealing(boolean allowLinkStealing) { 727 this.allowLinkStealing = allowLinkStealing; 728 } 729}