001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.Socket;
026import java.net.SocketAddress;
027import java.net.SocketException;
028import java.net.SocketTimeoutException;
029import java.net.URI;
030import java.net.UnknownHostException;
031import java.nio.ByteBuffer;
032import java.security.cert.X509Certificate;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.CountDownLatch;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicReference;
038
039import javax.net.SocketFactory;
040
041import org.apache.activemq.Service;
042import org.apache.activemq.TransportLoggerSupport;
043import org.apache.activemq.thread.TaskRunnerFactory;
044import org.apache.activemq.transport.Transport;
045import org.apache.activemq.transport.TransportThreadSupport;
046import org.apache.activemq.util.InetAddressUtil;
047import org.apache.activemq.util.IntrospectionSupport;
048import org.apache.activemq.util.ServiceStopper;
049import org.apache.activemq.wireformat.WireFormat;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * An implementation of the {@link Transport} interface using raw tcp/ip
055 */
056public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
057
058    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059
060    protected final URI remoteLocation;
061    protected final URI localLocation;
062    protected final WireFormat wireFormat;
063
064    protected int connectionTimeout = 30000;
065    protected int soTimeout;
066    protected int socketBufferSize = 64 * 1024;
067    protected int ioBufferSize = 8 * 1024;
068    protected boolean closeAsync=true;
069    protected Socket socket;
070    protected DataOutputStream dataOut;
071    protected DataInputStream dataIn;
072    protected TimeStampStream buffOut = null;
073
074    protected final InitBuffer initBuffer;
075
076    /**
077     * The Traffic Class to be set on the socket.
078     */
079    protected int trafficClass = 0;
080    /**
081     * Keeps track of attempts to set the Traffic Class on the socket.
082     */
083    private boolean trafficClassSet = false;
084    /**
085     * Prevents setting both the Differentiated Services and Type of Service
086     * transport options at the same time, since they share the same spot in
087     * the TCP/IP packet headers.
088     */
089    protected boolean diffServChosen = false;
090    protected boolean typeOfServiceChosen = false;
091    /**
092     * trace=true -> the Transport stack where this TcpTransport
093     * object will be, will have a TransportLogger layer
094     * trace=false -> the Transport stack where this TcpTransport
095     * object will be, will NOT have a TransportLogger layer, and therefore
096     * will never be able to print logging messages.
097     * This parameter is most probably set in Connection or TransportConnector URIs.
098     */
099    protected boolean trace = false;
100    /**
101     * Name of the LogWriter implementation to use.
102     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
103     * This parameter is most probably set in Connection or TransportConnector URIs.
104     */
105    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
106    /**
107     * Specifies if the TransportLogger will be manageable by JMX or not.
108     * Also, as long as there is at least 1 TransportLogger which is manageable,
109     * a TransportLoggerControl MBean will me created.
110     */
111    protected boolean dynamicManagement = false;
112    /**
113     * startLogging=true -> the TransportLogger object of the Transport stack
114     * will initially write messages to the log.
115     * startLogging=false -> the TransportLogger object of the Transport stack
116     * will initially NOT write messages to the log.
117     * This parameter only has an effect if trace == true.
118     * This parameter is most probably set in Connection or TransportConnector URIs.
119     */
120    protected boolean startLogging = true;
121    /**
122     * Specifies the port that will be used by the JMX server to manage
123     * the TransportLoggers.
124     * This should only be set in an URI by a client (producer or consumer) since
125     * a broker will already create a JMX server.
126     * It is useful for people who test a broker and clients in the same machine
127     * and want to control both via JMX; a different port will be needed.
128     */
129    protected int jmxPort = 1099;
130    protected boolean useLocalHost = false;
131    protected int minmumWireFormatVersion;
132    protected SocketFactory socketFactory;
133    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
134    protected volatile int receiveCounter;
135
136    protected Map<String, Object> socketOptions;
137    private int soLinger = Integer.MIN_VALUE;
138    private Boolean keepAlive;
139    private Boolean tcpNoDelay;
140    private Thread runnerThread;
141
142    /**
143     * Connect to a remote Node - e.g. a Broker
144     *
145     * @param wireFormat
146     * @param socketFactory
147     * @param remoteLocation
148     * @param localLocation - e.g. local InetAddress and local port
149     * @throws IOException
150     * @throws UnknownHostException
151     */
152    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
153                        URI localLocation) throws UnknownHostException, IOException {
154        this.wireFormat = wireFormat;
155        this.socketFactory = socketFactory;
156        try {
157            this.socket = socketFactory.createSocket();
158        } catch (SocketException e) {
159            this.socket = null;
160        }
161        this.remoteLocation = remoteLocation;
162        this.localLocation = localLocation;
163        this.initBuffer = null;
164        setDaemon(false);
165    }
166
167    /**
168     * Initialize from a server Socket
169     *
170     * @param wireFormat
171     * @param socket
172     * @throws IOException
173     */
174    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
175        this(wireFormat, socket, null);
176    }
177
178    public TcpTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
179        this.wireFormat = wireFormat;
180        this.socket = socket;
181        this.remoteLocation = null;
182        this.localLocation = null;
183        this.initBuffer = initBuffer;
184        setDaemon(true);
185    }
186
187    /**
188     * A one way asynchronous send
189     */
190    @Override
191    public void oneway(Object command) throws IOException {
192        checkStarted();
193        wireFormat.marshal(command, dataOut);
194        dataOut.flush();
195    }
196
197    /**
198     * @return pretty print of 'this'
199     */
200    @Override
201    public String toString() {
202        return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort()
203                : (localLocation != null ? localLocation : remoteLocation)) ;
204    }
205
206    /**
207     * reads packets from a Socket
208     */
209    @Override
210    public void run() {
211        LOG.trace("TCP consumer thread for " + this + " starting");
212        this.runnerThread=Thread.currentThread();
213        try {
214            while (!isStopped()) {
215                doRun();
216            }
217        } catch (IOException e) {
218            stoppedLatch.get().countDown();
219            onException(e);
220        } catch (Throwable e){
221            stoppedLatch.get().countDown();
222            IOException ioe=new IOException("Unexpected error occurred: " + e);
223            ioe.initCause(e);
224            onException(ioe);
225        }finally {
226            stoppedLatch.get().countDown();
227        }
228    }
229
230    protected void doRun() throws IOException {
231        try {
232            Object command = readCommand();
233            doConsume(command);
234        } catch (SocketTimeoutException e) {
235        } catch (InterruptedIOException e) {
236        }
237    }
238
239    protected Object readCommand() throws IOException {
240        return wireFormat.unmarshal(dataIn);
241    }
242
243    // Properties
244    // -------------------------------------------------------------------------
245    public String getDiffServ() {
246        // This is the value requested by the user by setting the Tcp Transport
247        // options. If the socket hasn't been created, then this value may not
248        // reflect the value returned by Socket.getTrafficClass().
249        return Integer.toString(this.trafficClass);
250    }
251
252    public void setDiffServ(String diffServ) throws IllegalArgumentException {
253        this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
254        this.diffServChosen = true;
255    }
256
257    public int getTypeOfService() {
258        // This is the value requested by the user by setting the Tcp Transport
259        // options. If the socket hasn't been created, then this value may not
260        // reflect the value returned by Socket.getTrafficClass().
261        return this.trafficClass;
262    }
263
264    public void setTypeOfService(int typeOfService) {
265        this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
266        this.typeOfServiceChosen = true;
267    }
268
269    public boolean isTrace() {
270        return trace;
271    }
272
273    public void setTrace(boolean trace) {
274        this.trace = trace;
275    }
276
277    public String getLogWriterName() {
278        return logWriterName;
279    }
280
281    public void setLogWriterName(String logFormat) {
282        this.logWriterName = logFormat;
283    }
284
285    public boolean isDynamicManagement() {
286        return dynamicManagement;
287    }
288
289    public void setDynamicManagement(boolean useJmx) {
290        this.dynamicManagement = useJmx;
291    }
292
293    public boolean isStartLogging() {
294        return startLogging;
295    }
296
297    public void setStartLogging(boolean startLogging) {
298        this.startLogging = startLogging;
299    }
300
301    public int getJmxPort() {
302        return jmxPort;
303    }
304
305    public void setJmxPort(int jmxPort) {
306        this.jmxPort = jmxPort;
307    }
308
309    public int getMinmumWireFormatVersion() {
310        return minmumWireFormatVersion;
311    }
312
313    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
314        this.minmumWireFormatVersion = minmumWireFormatVersion;
315    }
316
317    public boolean isUseLocalHost() {
318        return useLocalHost;
319    }
320
321    /**
322     * Sets whether 'localhost' or the actual local host name should be used to
323     * make local connections. On some operating systems such as Macs its not
324     * possible to connect as the local host name so localhost is better.
325     */
326    public void setUseLocalHost(boolean useLocalHost) {
327        this.useLocalHost = useLocalHost;
328    }
329
330    public int getSocketBufferSize() {
331        return socketBufferSize;
332    }
333
334    /**
335     * Sets the buffer size to use on the socket
336     */
337    public void setSocketBufferSize(int socketBufferSize) {
338        this.socketBufferSize = socketBufferSize;
339    }
340
341    public int getSoTimeout() {
342        return soTimeout;
343    }
344
345    /**
346     * Sets the socket timeout
347     */
348    public void setSoTimeout(int soTimeout) {
349        this.soTimeout = soTimeout;
350    }
351
352    public int getConnectionTimeout() {
353        return connectionTimeout;
354    }
355
356    /**
357     * Sets the timeout used to connect to the socket
358     */
359    public void setConnectionTimeout(int connectionTimeout) {
360        this.connectionTimeout = connectionTimeout;
361    }
362
363    public Boolean getKeepAlive() {
364        return keepAlive;
365    }
366
367    /**
368     * Enable/disable TCP KEEP_ALIVE mode
369     */
370    public void setKeepAlive(Boolean keepAlive) {
371        this.keepAlive = keepAlive;
372    }
373
374    /**
375     * Enable/disable soLinger
376     * @param soLinger enabled if > -1, disabled if == -1, system default otherwise
377     */
378    public void setSoLinger(int soLinger) {
379        this.soLinger = soLinger;
380    }
381
382    public int getSoLinger() {
383        return soLinger;
384    }
385
386    public Boolean getTcpNoDelay() {
387        return tcpNoDelay;
388    }
389
390    /**
391     * Enable/disable the TCP_NODELAY option on the socket
392     */
393    public void setTcpNoDelay(Boolean tcpNoDelay) {
394        this.tcpNoDelay = tcpNoDelay;
395    }
396
397    /**
398     * @return the ioBufferSize
399     */
400    public int getIoBufferSize() {
401        return this.ioBufferSize;
402    }
403
404    /**
405     * @param ioBufferSize the ioBufferSize to set
406     */
407    public void setIoBufferSize(int ioBufferSize) {
408        this.ioBufferSize = ioBufferSize;
409    }
410
411    /**
412     * @return the closeAsync
413     */
414    public boolean isCloseAsync() {
415        return closeAsync;
416    }
417
418    /**
419     * @param closeAsync the closeAsync to set
420     */
421    public void setCloseAsync(boolean closeAsync) {
422        this.closeAsync = closeAsync;
423    }
424
425    // Implementation methods
426    // -------------------------------------------------------------------------
427    protected String resolveHostName(String host) throws UnknownHostException {
428        if (isUseLocalHost()) {
429            String localName = InetAddressUtil.getLocalHostName();
430            if (localName != null && localName.equals(host)) {
431                return "localhost";
432            }
433        }
434        return host;
435    }
436
437    /**
438     * Configures the socket for use
439     *
440     * @param sock  the socket
441     * @throws SocketException, IllegalArgumentException if setting the options
442     *         on the socket failed.
443     */
444    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
445        if (socketOptions != null) {
446            // copy the map as its used values is being removed when calling setProperties
447            // and we need to be able to set the options again in case socket is re-initailized
448            Map<String, Object> copy = new HashMap<String, Object>(socketOptions);
449            IntrospectionSupport.setProperties(socket, copy);
450            if (!copy.isEmpty()) {
451                throw new IllegalArgumentException("Invalid socket parameters: " + copy);
452            }
453        }
454
455        try {
456            //only positive values are legal
457            if (socketBufferSize > 0) {
458                sock.setReceiveBufferSize(socketBufferSize);
459                sock.setSendBufferSize(socketBufferSize);
460            } else {
461                LOG.warn("Socket buffer size was set to {}; Skipping this setting as the size must be a positive number.", socketBufferSize);
462            }
463        } catch (SocketException se) {
464            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
465            LOG.debug("Cannot set socket buffer size. Reason: " + se.getMessage() + ". This exception is ignored.", se);
466        }
467        sock.setSoTimeout(soTimeout);
468
469        if (keepAlive != null) {
470            sock.setKeepAlive(keepAlive.booleanValue());
471        }
472
473        if (soLinger > -1) {
474            sock.setSoLinger(true, soLinger);
475        } else if (soLinger == -1) {
476            sock.setSoLinger(false, 0);
477        }
478        if (tcpNoDelay != null) {
479            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
480        }
481        if (!this.trafficClassSet) {
482            this.trafficClassSet = setTrafficClass(sock);
483        }
484    }
485
486    @Override
487    protected void doStart() throws Exception {
488        connect();
489        stoppedLatch.set(new CountDownLatch(1));
490        super.doStart();
491    }
492
493    protected void connect() throws Exception {
494
495        if (socket == null && socketFactory == null) {
496            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
497        }
498
499        InetSocketAddress localAddress = null;
500        InetSocketAddress remoteAddress = null;
501
502        if (localLocation != null) {
503            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
504                                                 localLocation.getPort());
505        }
506
507        if (remoteLocation != null) {
508            String host = resolveHostName(remoteLocation.getHost());
509            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
510        }
511        // Set the traffic class before the socket is connected when possible so
512        // that the connection packets are given the correct traffic class.
513        this.trafficClassSet = setTrafficClass(socket);
514
515        if (socket != null) {
516
517            if (localAddress != null) {
518                socket.bind(localAddress);
519            }
520
521            // If it's a server accepted socket.. we don't need to connect it
522            // to a remote address.
523            if (remoteAddress != null) {
524                if (connectionTimeout >= 0) {
525                    socket.connect(remoteAddress, connectionTimeout);
526                } else {
527                    socket.connect(remoteAddress);
528                }
529            }
530
531        } else {
532            // For SSL sockets.. you can't create an unconnected socket :(
533            // This means the timout option are not supported either.
534            if (localAddress != null) {
535                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
536                                                    localAddress.getAddress(), localAddress.getPort());
537            } else {
538                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
539            }
540        }
541
542        initialiseSocket(socket);
543        initializeStreams();
544    }
545
546    @Override
547    protected void doStop(ServiceStopper stopper) throws Exception {
548        if (LOG.isDebugEnabled()) {
549            LOG.debug("Stopping transport " + this);
550        }
551
552        // Closing the streams flush the sockets before closing.. if the socket
553        // is hung.. then this hangs the close.
554        // closeStreams();
555        if (socket != null) {
556            if (closeAsync) {
557                //closing the socket can hang also
558                final CountDownLatch latch = new CountDownLatch(1);
559
560                // need a async task for this
561                final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
562                taskRunnerFactory.execute(new Runnable() {
563                    @Override
564                    public void run() {
565                        LOG.trace("Closing socket {}", socket);
566                        try {
567                            socket.close();
568                            LOG.debug("Closed socket {}", socket);
569                        } catch (IOException e) {
570                            if (LOG.isDebugEnabled()) {
571                                LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
572                            }
573                        } finally {
574                            latch.countDown();
575                        }
576                    }
577                });
578
579                try {
580                    latch.await(1,TimeUnit.SECONDS);
581                } catch (InterruptedException e) {
582                    Thread.currentThread().interrupt();
583                } finally {
584                    taskRunnerFactory.shutdownNow();
585                }
586
587            } else {
588                // close synchronously
589                LOG.trace("Closing socket {}", socket);
590                try {
591                    socket.close();
592                    LOG.debug("Closed socket {}", socket);
593                } catch (IOException e) {
594                    if (LOG.isDebugEnabled()) {
595                        LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
596                    }
597                }
598            }
599        }
600    }
601
602    /**
603     * Override so that stop() blocks until the run thread is no longer running.
604     */
605    @Override
606    public void stop() throws Exception {
607        super.stop();
608        CountDownLatch countDownLatch = stoppedLatch.get();
609        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
610            countDownLatch.await(1,TimeUnit.SECONDS);
611        }
612    }
613
614    protected void initializeStreams() throws Exception {
615        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
616            @Override
617            public int read() throws IOException {
618                receiveCounter++;
619                return super.read();
620            }
621            @Override
622            public int read(byte[] b, int off, int len) throws IOException {
623                receiveCounter++;
624                return super.read(b, off, len);
625            }
626            @Override
627            public long skip(long n) throws IOException {
628                receiveCounter++;
629                return super.skip(n);
630            }
631            @Override
632            protected void fill() throws IOException {
633                receiveCounter++;
634                super.fill();
635            }
636        };
637        //Unread the initBuffer that was used for protocol detection if it exists
638        //so the stream can start over
639        if (initBuffer != null) {
640            buffIn.unread(initBuffer.buffer.array());
641        }
642        this.dataIn = new DataInputStream(buffIn);
643        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
644        this.dataOut = new DataOutputStream(outputStream);
645        this.buffOut = outputStream;
646
647    }
648
649    protected void closeStreams() throws IOException {
650        if (dataOut != null) {
651            dataOut.close();
652        }
653        if (dataIn != null) {
654            dataIn.close();
655        }
656    }
657
658    public void setSocketOptions(Map<String, Object> socketOptions) {
659        this.socketOptions = new HashMap<String, Object>(socketOptions);
660    }
661
662    @Override
663    public String getRemoteAddress() {
664        if (socket != null) {
665            SocketAddress address = socket.getRemoteSocketAddress();
666            if (address instanceof InetSocketAddress) {
667                return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort();
668            } else {
669                return "" + socket.getRemoteSocketAddress();
670            }
671        }
672        return null;
673    }
674
675    @Override
676    public <T> T narrow(Class<T> target) {
677        if (target == Socket.class) {
678            return target.cast(socket);
679        } else if ( target == TimeStampStream.class) {
680            return target.cast(buffOut);
681        }
682        return super.narrow(target);
683    }
684
685    @Override
686    public int getReceiveCounter() {
687        return receiveCounter;
688    }
689
690    public static class InitBuffer {
691        public final int readSize;
692        public final ByteBuffer buffer;
693
694        public InitBuffer(int readSize, ByteBuffer buffer) {
695            if (buffer == null) {
696                throw new IllegalArgumentException("Null buffer not allowed.");
697            }
698            this.readSize = readSize;
699            this.buffer = buffer;
700        }
701    }
702
703    /**
704     * @param sock The socket on which to set the Traffic Class.
705     * @return Whether or not the Traffic Class was set on the given socket.
706     * @throws SocketException if the system does not support setting the
707     *         Traffic Class.
708     * @throws IllegalArgumentException if both the Differentiated Services and
709     *         Type of Services transport options have been set on the same
710     *         connection.
711     */
712    private boolean setTrafficClass(Socket sock) throws SocketException,
713            IllegalArgumentException {
714        if (sock == null
715            || (!this.diffServChosen && !this.typeOfServiceChosen)) {
716            return false;
717        }
718        if (this.diffServChosen && this.typeOfServiceChosen) {
719            throw new IllegalArgumentException("Cannot set both the "
720                + " Differentiated Services and Type of Services transport "
721                + " options on the same connection.");
722        }
723
724        sock.setTrafficClass(this.trafficClass);
725
726        int resultTrafficClass = sock.getTrafficClass();
727        if (this.trafficClass != resultTrafficClass) {
728            // In the case where the user has specified the ECN bits (e.g. in
729            // Type of Service) but the system won't allow the ECN bits to be
730            // set or in the case where setting the traffic class failed for
731            // other reasons, emit a warning.
732            if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
733                    && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
734                LOG.warn("Attempted to set the Traffic Class to "
735                    + this.trafficClass + " but the result Traffic Class was "
736                    + resultTrafficClass + ". Please check that your system "
737                    + "allows you to set the ECN bits (the first two bits).");
738            } else {
739                LOG.warn("Attempted to set the Traffic Class to "
740                    + this.trafficClass + " but the result Traffic Class was "
741                    + resultTrafficClass + ". Please check that your system "
742                         + "supports java.net.setTrafficClass.");
743            }
744            return false;
745        }
746        // Reset the guards that prevent both the Differentiated Services
747        // option and the Type of Service option from being set on the same
748        // connection.
749        this.diffServChosen = false;
750        this.typeOfServiceChosen = false;
751        return true;
752    }
753
754    @Override
755    public WireFormat getWireFormat() {
756        return wireFormat;
757    }
758
759    @Override
760    public X509Certificate[] getPeerCertificates() {
761        return null;
762    }
763
764    @Override
765    public void setPeerCertificates(X509Certificate[] certificates) {
766    }
767}