001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.ClosedChannelException;
030import java.nio.channels.SelectionKey;
031import java.nio.channels.Selector;
032import java.nio.channels.ServerSocketChannel;
033import java.nio.channels.SocketChannel;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import javax.net.ServerSocketFactory;
043import javax.net.ssl.SSLParameters;
044import javax.net.ssl.SSLServerSocket;
045
046import org.apache.activemq.Service;
047import org.apache.activemq.ThreadPriorities;
048import org.apache.activemq.TransportLoggerSupport;
049import org.apache.activemq.command.BrokerInfo;
050import org.apache.activemq.openwire.OpenWireFormatFactory;
051import org.apache.activemq.transport.Transport;
052import org.apache.activemq.transport.TransportFactory;
053import org.apache.activemq.transport.TransportServer;
054import org.apache.activemq.transport.TransportServerThreadSupport;
055import org.apache.activemq.util.IOExceptionSupport;
056import org.apache.activemq.util.InetAddressUtil;
057import org.apache.activemq.util.IntrospectionSupport;
058import org.apache.activemq.util.ServiceListener;
059import org.apache.activemq.util.ServiceStopper;
060import org.apache.activemq.util.ServiceSupport;
061import org.apache.activemq.wireformat.WireFormat;
062import org.apache.activemq.wireformat.WireFormatFactory;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A TCP based implementation of {@link TransportServer}
068 */
069public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
070
071    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
072
073    protected volatile ServerSocket serverSocket;
074    protected volatile Selector selector;
075    protected int backlog = 5000;
076    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
077    protected final TcpTransportFactory transportFactory;
078    protected long maxInactivityDuration = 30000;
079    protected long maxInactivityDurationInitalDelay = 10000;
080    protected int minmumWireFormatVersion;
081    protected boolean useQueueForAccept = true;
082    protected boolean allowLinkStealing;
083    protected boolean verifyHostName = false;
084
085    /**
086     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
087     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
088     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
089     * TransportConnector URIs.
090     */
091    protected boolean trace = false;
092
093    protected int soTimeout = 0;
094    protected int socketBufferSize = 64 * 1024;
095    protected int connectionTimeout = 30000;
096
097    /**
098     * Name of the LogWriter implementation to use. Names are mapped to classes in the
099     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
100     * set in Connection or TransportConnector URIs.
101     */
102    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
103
104    /**
105     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
106     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
107     */
108    protected boolean dynamicManagement = false;
109
110    /**
111     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
112     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
113     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
114     * TransportConnector URIs.
115     */
116    protected boolean startLogging = true;
117    protected int jmxPort = TransportLoggerSupport.defaultJmxPort;
118    protected final ServerSocketFactory serverSocketFactory;
119    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
120    protected Thread socketHandlerThread;
121
122    /**
123     * The maximum number of sockets allowed for this server
124     */
125    protected int maximumConnections = Integer.MAX_VALUE;
126    protected final AtomicInteger currentTransportCount = new AtomicInteger();
127
128    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
129        URISyntaxException {
130        super(location);
131        this.transportFactory = transportFactory;
132        this.serverSocketFactory = serverSocketFactory;
133    }
134
135    public void bind() throws IOException {
136        URI bind = getBindLocation();
137
138        String host = bind.getHost();
139        host = (host == null || host.length() == 0) ? "localhost" : host;
140        InetAddress addr = InetAddress.getByName(host);
141
142        try {
143            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
144            configureServerSocket(serverSocket);
145        } catch (IOException e) {
146            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
147        }
148        try {
149            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
150                bind.getQuery(), bind.getFragment()));
151        } catch (URISyntaxException e) {
152            // it could be that the host name contains invalid characters such
153            // as _ on unix platforms so lets try use the IP address instead
154            try {
155                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
156                    bind.getQuery(), bind.getFragment()));
157            } catch (URISyntaxException e2) {
158                throw IOExceptionSupport.create(e2);
159            }
160        }
161    }
162
163    private void configureServerSocket(ServerSocket socket) throws SocketException {
164        socket.setSoTimeout(2000);
165        if (transportOptions != null) {
166
167            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
168            // to SSLServerSocket to configure it has a side effect on the socket rendering it
169            // useless as all suites are enabled many of which are considered as insecure.  We
170            // instead trap that option here and throw an exception.  We should really consider
171            // all invalid options as breaking and not start the transport but the current design
172            // doesn't really allow for this.
173            //
174            //  see: https://issues.apache.org/jira/browse/AMQ-4582
175            //
176            if (socket instanceof SSLServerSocket) {
177                if (transportOptions.containsKey("verifyHostName")) {
178                    verifyHostName = Boolean.parseBoolean(transportOptions.get("verifyHostName").toString());
179                } else {
180                    transportOptions.put("verifyHostName", verifyHostName);
181                }
182
183                if (verifyHostName) {
184                    SSLParameters sslParams = new SSLParameters();
185                    sslParams.setEndpointIdentificationAlgorithm("HTTPS");
186                    ((SSLServerSocket)this.serverSocket).setSSLParameters(sslParams);
187                }
188
189                if (transportOptions.containsKey("enabledCipherSuites")) {
190                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
191
192                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
193                        throw new SocketException(String.format(
194                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
195                    }
196                }
197
198            }
199
200            //AMQ-6599 - don't strip out set properties on the socket as we need to set them
201            //on the Transport as well later
202            IntrospectionSupport.setProperties(socket, transportOptions, false);
203        }
204    }
205
206    /**
207     * @return Returns the wireFormatFactory.
208     */
209    public WireFormatFactory getWireFormatFactory() {
210        return wireFormatFactory;
211    }
212
213    /**
214     * @param wireFormatFactory
215     *            The wireFormatFactory to set.
216     */
217    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
218        this.wireFormatFactory = wireFormatFactory;
219    }
220
221    /**
222     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
223     * broker.
224     *
225     * @param brokerInfo
226     */
227    @Override
228    public void setBrokerInfo(BrokerInfo brokerInfo) {
229    }
230
231    public long getMaxInactivityDuration() {
232        return maxInactivityDuration;
233    }
234
235    public void setMaxInactivityDuration(long maxInactivityDuration) {
236        this.maxInactivityDuration = maxInactivityDuration;
237    }
238
239    public long getMaxInactivityDurationInitalDelay() {
240        return this.maxInactivityDurationInitalDelay;
241    }
242
243    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
244        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
245    }
246
247    public int getMinmumWireFormatVersion() {
248        return minmumWireFormatVersion;
249    }
250
251    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
252        this.minmumWireFormatVersion = minmumWireFormatVersion;
253    }
254
255    public boolean isTrace() {
256        return trace;
257    }
258
259    public void setTrace(boolean trace) {
260        this.trace = trace;
261    }
262
263    public String getLogWriterName() {
264        return logWriterName;
265    }
266
267    public void setLogWriterName(String logFormat) {
268        this.logWriterName = logFormat;
269    }
270
271    public boolean isDynamicManagement() {
272        return dynamicManagement;
273    }
274
275    public void setDynamicManagement(boolean useJmx) {
276        this.dynamicManagement = useJmx;
277    }
278
279    public void setJmxPort(int jmxPort) {
280        this.jmxPort = jmxPort;
281    }
282
283    public int getJmxPort() {
284        return jmxPort;
285    }
286
287    public boolean isStartLogging() {
288        return startLogging;
289    }
290
291    public void setStartLogging(boolean startLogging) {
292        this.startLogging = startLogging;
293    }
294
295    /**
296     * @return the backlog
297     */
298    public int getBacklog() {
299        return backlog;
300    }
301
302    /**
303     * @param backlog
304     *            the backlog to set
305     */
306    public void setBacklog(int backlog) {
307        this.backlog = backlog;
308    }
309
310    /**
311     * @return the useQueueForAccept
312     */
313    public boolean isUseQueueForAccept() {
314        return useQueueForAccept;
315    }
316
317    /**
318     * @param useQueueForAccept
319     *            the useQueueForAccept to set
320     */
321    public void setUseQueueForAccept(boolean useQueueForAccept) {
322        this.useQueueForAccept = useQueueForAccept;
323    }
324
325    /**
326     * pull Sockets from the ServerSocket
327     */
328    @Override
329    public void run() {
330        if (!isStopped() && !isStopping()) {
331            final ServerSocket serverSocket = this.serverSocket;
332            if (serverSocket == null) {
333                onAcceptError(new IOException("Server started without a valid ServerSocket"));
334            }
335
336            final ServerSocketChannel channel = serverSocket.getChannel();
337            if (channel != null) {
338                doRunWithServerSocketChannel(channel);
339            } else {
340                doRunWithServerSocket(serverSocket);
341            }
342        }
343    }
344
345    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
346        try {
347            channel.configureBlocking(false);
348            final Selector selector = Selector.open();
349
350            try {
351                channel.register(selector, SelectionKey.OP_ACCEPT);
352            } catch (ClosedChannelException ex) {
353                try {
354                    selector.close();
355                } catch (IOException ignore) {}
356
357                throw ex;
358            }
359
360            // Update object instance for later cleanup.
361            this.selector = selector;
362
363            while (!isStopped()) {
364                int count = selector.select(10);
365
366                if (count == 0) {
367                    continue;
368                }
369
370                Set<SelectionKey> keys = selector.selectedKeys();
371
372                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
373                    final SelectionKey key = i.next();
374                    if (key.isAcceptable()) {
375                        try {
376                            SocketChannel sc = channel.accept();
377                            if (sc != null) {
378                                if (isStopped() || getAcceptListener() == null) {
379                                    sc.close();
380                                } else {
381                                    if (useQueueForAccept) {
382                                        socketQueue.put(sc.socket());
383                                    } else {
384                                        handleSocket(sc.socket());
385                                    }
386                                }
387                            }
388
389                        } catch (SocketTimeoutException ste) {
390                            // expect this to happen
391                        } catch (Exception e) {
392                            e.printStackTrace();
393                            if (!isStopping()) {
394                                onAcceptError(e);
395                            } else if (!isStopped()) {
396                                LOG.warn("run()", e);
397                                onAcceptError(e);
398                            }
399                        }
400                    }
401                    i.remove();
402                }
403            }
404        } catch (IOException ex) {
405            if (!isStopping()) {
406                onAcceptError(ex);
407            } else if (!isStopped()) {
408                LOG.warn("run()", ex);
409                onAcceptError(ex);
410            }
411        }
412    }
413
414    private void doRunWithServerSocket(final ServerSocket serverSocket) {
415        while (!isStopped()) {
416            Socket socket = null;
417            try {
418                socket = serverSocket.accept();
419                if (socket != null) {
420                    if (isStopped() || getAcceptListener() == null) {
421                        socket.close();
422                    } else {
423                        if (useQueueForAccept) {
424                            socketQueue.put(socket);
425                        } else {
426                            handleSocket(socket);
427                        }
428                    }
429                }
430            } catch (SocketTimeoutException ste) {
431                // expect this to happen
432            } catch (Exception e) {
433                if (!isStopping()) {
434                    onAcceptError(e);
435                } else if (!isStopped()) {
436                    LOG.warn("run()", e);
437                    onAcceptError(e);
438                }
439            }
440        }
441    }
442
443    /**
444     * Allow derived classes to override the Transport implementation that this transport server creates.
445     *
446     * @param socket
447     * @param format
448     *
449     * @return a new Transport instance.
450     *
451     * @throws IOException
452     */
453    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
454        return new TcpTransport(format, socket);
455    }
456
457    /**
458     * @return pretty print of this
459     */
460    @Override
461    public String toString() {
462        return "" + getBindLocation();
463    }
464
465    /**
466     * @param socket
467     * @param bindAddress
468     * @return real hostName
469     * @throws UnknownHostException
470     */
471    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
472        String result = null;
473        if (socket.isBound()) {
474            if (socket.getInetAddress().isAnyLocalAddress()) {
475                // make it more human readable and useful, an alternative to 0.0.0.0
476                result = InetAddressUtil.getLocalHostName();
477            } else {
478                result = socket.getInetAddress().getCanonicalHostName();
479            }
480        } else {
481            result = bindAddress.getCanonicalHostName();
482        }
483        return result;
484    }
485
486    @Override
487    protected void doStart() throws Exception {
488        if (useQueueForAccept) {
489            Runnable run = new Runnable() {
490                @Override
491                public void run() {
492                    try {
493                        while (!isStopped() && !isStopping()) {
494                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
495                            if (sock != null) {
496                                try {
497                                    handleSocket(sock);
498                                } catch (Throwable thrown) {
499                                    if (!isStopping()) {
500                                        onAcceptError(new Exception(thrown));
501                                    } else if (!isStopped()) {
502                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
503                                        onAcceptError(new Exception(thrown));
504                                    }
505                                }
506                            }
507                        }
508
509                    } catch (InterruptedException e) {
510                        if (!isStopped() || !isStopping()) {
511                            LOG.info("socketQueue interrupted - stopping");
512                            onAcceptError(e);
513                        }
514                    }
515                }
516            };
517            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
518            socketHandlerThread.setDaemon(true);
519            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
520            socketHandlerThread.start();
521        }
522        super.doStart();
523    }
524
525    @Override
526    protected void doStop(ServiceStopper stopper) throws Exception {
527        Exception firstFailure = null;
528
529        try {
530            if (selector != null) {
531                selector.close();
532                selector = null;
533            }
534        } catch (Exception error) {
535        }
536
537        try {
538            final ServerSocket serverSocket = this.serverSocket;
539            if (serverSocket != null) {
540                this.serverSocket = null;
541                serverSocket.close();
542            }
543        } catch (Exception error) {
544            firstFailure = error;
545        }
546
547        if (socketHandlerThread != null) {
548            socketHandlerThread.interrupt();
549            socketHandlerThread = null;
550        }
551
552        try {
553            super.doStop(stopper);
554        } catch (Exception error) {
555            if (firstFailure != null) {
556                firstFailure = error;
557            }
558        }
559
560        if (firstFailure != null) {
561            throw firstFailure;
562        }
563    }
564
565    @Override
566    public InetSocketAddress getSocketAddress() {
567        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
568    }
569
570    protected void handleSocket(Socket socket) {
571        doHandleSocket(socket);
572    }
573
574    final protected void doHandleSocket(Socket socket) {
575        boolean closeSocket = true;
576        boolean countIncremented = false;
577        try {
578            int currentCount;
579            do {
580                currentCount = currentTransportCount.get();
581                if (currentCount >= this.maximumConnections) {
582                     throw new ExceededMaximumConnectionsException(
583                         "Exceeded the maximum number of allowed client connections. See the '" +
584                         "maximumConnections' property on the TCP transport configuration URI " +
585                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
586                 }
587
588            //Increment this value before configuring the transport
589            //This is necessary because some of the transport servers must read from the
590            //socket during configureTransport() so we want to make sure this value is
591            //accurate as the transport server could pause here waiting for data to be sent from a client
592            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
593            countIncremented = true;
594
595            HashMap<String, Object> options = new HashMap<String, Object>();
596            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
597            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
598            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
599            options.put("trace", Boolean.valueOf(trace));
600            options.put("soTimeout", Integer.valueOf(soTimeout));
601            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
602            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
603            options.put("logWriterName", logWriterName);
604            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
605            options.put("startLogging", Boolean.valueOf(startLogging));
606            options.put("jmxPort", Integer.valueOf(jmxPort));
607            options.putAll(transportOptions);
608
609            TransportInfo transportInfo = configureTransport(this, socket);
610            closeSocket = false;
611
612            if (transportInfo.transport instanceof ServiceSupport) {
613                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
614            }
615
616            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
617                    transportInfo.transport, transportInfo.format, options);
618
619            getAcceptListener().onAccept(configuredTransport);
620
621        } catch (SocketTimeoutException ste) {
622            // expect this to happen
623        } catch (Exception e) {
624            if (closeSocket) {
625                try {
626                    //if closing the socket, only decrement the count it was actually incremented
627                    //where it was incremented
628                    if (countIncremented) {
629                        currentTransportCount.decrementAndGet();
630                    }
631                    socket.close();
632                } catch (Exception ignore) {
633                }
634            }
635
636            if (!isStopping()) {
637                onAcceptError(e);
638            } else if (!isStopped()) {
639                LOG.warn("run()", e);
640                onAcceptError(e);
641            }
642        }
643    }
644
645    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
646        WireFormat format = wireFormatFactory.createWireFormat();
647        Transport transport = createTransport(socket, format);
648        return new TransportInfo(format, transport, transportFactory);
649    }
650
651    protected class TransportInfo {
652        final WireFormat format;
653        final Transport transport;
654        final TransportFactory transportFactory;
655
656        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
657            this.format = format;
658            this.transport = transport;
659            this.transportFactory = transportFactory;
660        }
661    }
662
663    public int getSoTimeout() {
664        return soTimeout;
665    }
666
667    public void setSoTimeout(int soTimeout) {
668        this.soTimeout = soTimeout;
669    }
670
671    public int getSocketBufferSize() {
672        return socketBufferSize;
673    }
674
675    public void setSocketBufferSize(int socketBufferSize) {
676        this.socketBufferSize = socketBufferSize;
677    }
678
679    public int getConnectionTimeout() {
680        return connectionTimeout;
681    }
682
683    public void setConnectionTimeout(int connectionTimeout) {
684        this.connectionTimeout = connectionTimeout;
685    }
686
687    /**
688     * @return the maximumConnections
689     */
690    public int getMaximumConnections() {
691        return maximumConnections;
692    }
693
694    /**
695     * @param maximumConnections
696     *            the maximumConnections to set
697     */
698    public void setMaximumConnections(int maximumConnections) {
699        this.maximumConnections = maximumConnections;
700    }
701
702    public AtomicInteger getCurrentTransportCount() {
703        return currentTransportCount;
704    }
705
706    @Override
707    public void started(Service service) {
708    }
709
710    @Override
711    public void stopped(Service service) {
712        this.currentTransportCount.decrementAndGet();
713    }
714
715    @Override
716    public boolean isSslServer() {
717        return false;
718    }
719
720    @Override
721    public boolean isAllowLinkStealing() {
722        return allowLinkStealing;
723    }
724
725    @Override
726    public void setAllowLinkStealing(boolean allowLinkStealing) {
727        this.allowLinkStealing = allowLinkStealing;
728    }
729}