001    /**
002    gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.InterruptedIOException;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.Socket;
026    import java.net.SocketAddress;
027    import java.net.SocketException;
028    import java.net.SocketTimeoutException;
029    import java.net.URI;
030    import java.net.UnknownHostException;
031    import java.util.HashMap;
032    import java.util.Map;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicReference;
036    
037    import javax.net.SocketFactory;
038    
039    import org.apache.activemq.Service;
040    import org.apache.activemq.thread.TaskRunnerFactory;
041    import org.apache.activemq.transport.Transport;
042    import org.apache.activemq.transport.TransportLoggerFactory;
043    import org.apache.activemq.transport.TransportThreadSupport;
044    import org.apache.activemq.util.InetAddressUtil;
045    import org.apache.activemq.util.IntrospectionSupport;
046    import org.apache.activemq.util.ServiceStopper;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.slf4j.Logger;
049    import org.slf4j.LoggerFactory;
050    
051    /**
052     * An implementation of the {@link Transport} interface using raw tcp/ip
053     *
054     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055     *
056     */
057    public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058        private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059        protected final URI remoteLocation;
060        protected final URI localLocation;
061        protected final WireFormat wireFormat;
062    
063        protected int connectionTimeout = 30000;
064        protected int soTimeout;
065        protected int socketBufferSize = 64 * 1024;
066        protected int ioBufferSize = 8 * 1024;
067        protected boolean closeAsync=true;
068        protected Socket socket;
069        protected DataOutputStream dataOut;
070        protected DataInputStream dataIn;
071        protected TimeStampStream buffOut = null;
072    
073        /**
074         * The Traffic Class to be set on the socket.
075         */
076        protected int trafficClass = 0;
077        /**
078         * Keeps track of attempts to set the Traffic Class on the socket.
079         */
080        private boolean trafficClassSet = false;
081        /**
082         * Prevents setting both the Differentiated Services and Type of Service
083         * transport options at the same time, since they share the same spot in
084         * the TCP/IP packet headers.
085         */
086        protected boolean diffServChosen = false;
087        protected boolean typeOfServiceChosen = false;
088        /**
089         * trace=true -> the Transport stack where this TcpTransport
090         * object will be, will have a TransportLogger layer
091         * trace=false -> the Transport stack where this TcpTransport
092         * object will be, will NOT have a TransportLogger layer, and therefore
093         * will never be able to print logging messages.
094         * This parameter is most probably set in Connection or TransportConnector URIs.
095         */
096        protected boolean trace = false;
097        /**
098         * Name of the LogWriter implementation to use.
099         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
100         * This parameter is most probably set in Connection or TransportConnector URIs.
101         */
102        protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
103        /**
104         * Specifies if the TransportLogger will be manageable by JMX or not.
105         * Also, as long as there is at least 1 TransportLogger which is manageable,
106         * a TransportLoggerControl MBean will me created.
107         */
108        protected boolean dynamicManagement = false;
109        /**
110         * startLogging=true -> the TransportLogger object of the Transport stack
111         * will initially write messages to the log.
112         * startLogging=false -> the TransportLogger object of the Transport stack
113         * will initially NOT write messages to the log.
114         * This parameter only has an effect if trace == true.
115         * This parameter is most probably set in Connection or TransportConnector URIs.
116         */
117        protected boolean startLogging = true;
118        /**
119         * Specifies the port that will be used by the JMX server to manage
120         * the TransportLoggers.
121         * This should only be set in an URI by a client (producer or consumer) since
122         * a broker will already create a JMX server.
123         * It is useful for people who test a broker and clients in the same machine
124         * and want to control both via JMX; a different port will be needed.
125         */
126        protected int jmxPort = 1099;
127        protected boolean useLocalHost = false;
128        protected int minmumWireFormatVersion;
129        protected SocketFactory socketFactory;
130        protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
131    
132        private Map<String, Object> socketOptions;
133        private int soLinger = Integer.MIN_VALUE;
134        private Boolean keepAlive;
135        private Boolean tcpNoDelay;
136        private Thread runnerThread;
137        private volatile int receiveCounter;
138    
139        /**
140         * Connect to a remote Node - e.g. a Broker
141         *
142         * @param wireFormat
143         * @param socketFactory
144         * @param remoteLocation
145         * @param localLocation - e.g. local InetAddress and local port
146         * @throws IOException
147         * @throws UnknownHostException
148         */
149        public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
150                            URI localLocation) throws UnknownHostException, IOException {
151            this.wireFormat = wireFormat;
152            this.socketFactory = socketFactory;
153            try {
154                this.socket = socketFactory.createSocket();
155            } catch (SocketException e) {
156                this.socket = null;
157            }
158            this.remoteLocation = remoteLocation;
159            this.localLocation = localLocation;
160            setDaemon(false);
161        }
162    
163        /**
164         * Initialize from a server Socket
165         *
166         * @param wireFormat
167         * @param socket
168         * @throws IOException
169         */
170        public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
171            this.wireFormat = wireFormat;
172            this.socket = socket;
173            this.remoteLocation = null;
174            this.localLocation = null;
175            setDaemon(true);
176        }
177    
178        /**
179         * A one way asynchronous send
180         */
181        public void oneway(Object command) throws IOException {
182            checkStarted();
183            wireFormat.marshal(command, dataOut);
184            dataOut.flush();
185        }
186    
187        /**
188         * @return pretty print of 'this'
189         */
190        @Override
191        public String toString() {
192            return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort()
193                    : (localLocation != null ? localLocation : remoteLocation)) ;
194        }
195    
196        /**
197         * reads packets from a Socket
198         */
199        public void run() {
200            LOG.trace("TCP consumer thread for " + this + " starting");
201            this.runnerThread=Thread.currentThread();
202            try {
203                while (!isStopped()) {
204                    doRun();
205                }
206            } catch (IOException e) {
207                stoppedLatch.get().countDown();
208                onException(e);
209            } catch (Throwable e){
210                stoppedLatch.get().countDown();
211                IOException ioe=new IOException("Unexpected error occured: " + e);
212                ioe.initCause(e);
213                onException(ioe);
214            }finally {
215                stoppedLatch.get().countDown();
216            }
217        }
218    
219        protected void doRun() throws IOException {
220            try {
221                Object command = readCommand();
222                doConsume(command);
223            } catch (SocketTimeoutException e) {
224            } catch (InterruptedIOException e) {
225            }
226        }
227    
228        protected Object readCommand() throws IOException {
229            return wireFormat.unmarshal(dataIn);
230        }
231    
232        // Properties
233        // -------------------------------------------------------------------------
234        public String getDiffServ() {
235            // This is the value requested by the user by setting the Tcp Transport
236            // options. If the socket hasn't been created, then this value may not
237            // reflect the value returned by Socket.getTrafficClass().
238            return Integer.toString(this.trafficClass);
239        }
240    
241        public void setDiffServ(String diffServ) throws IllegalArgumentException {
242            this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
243            this.diffServChosen = true;
244        }
245    
246        public int getTypeOfService() {
247            // This is the value requested by the user by setting the Tcp Transport
248            // options. If the socket hasn't been created, then this value may not
249            // reflect the value returned by Socket.getTrafficClass().
250            return this.trafficClass;
251        }
252    
253        public void setTypeOfService(int typeOfService) {
254            this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
255            this.typeOfServiceChosen = true;
256        }
257    
258        public boolean isTrace() {
259            return trace;
260        }
261    
262        public void setTrace(boolean trace) {
263            this.trace = trace;
264        }
265    
266        public String getLogWriterName() {
267            return logWriterName;
268        }
269    
270        public void setLogWriterName(String logFormat) {
271            this.logWriterName = logFormat;
272        }
273    
274        public boolean isDynamicManagement() {
275            return dynamicManagement;
276        }
277    
278        public void setDynamicManagement(boolean useJmx) {
279            this.dynamicManagement = useJmx;
280        }
281    
282        public boolean isStartLogging() {
283            return startLogging;
284        }
285    
286        public void setStartLogging(boolean startLogging) {
287            this.startLogging = startLogging;
288        }
289    
290        public int getJmxPort() {
291            return jmxPort;
292        }
293    
294        public void setJmxPort(int jmxPort) {
295            this.jmxPort = jmxPort;
296        }
297    
298        public int getMinmumWireFormatVersion() {
299            return minmumWireFormatVersion;
300        }
301    
302        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
303            this.minmumWireFormatVersion = minmumWireFormatVersion;
304        }
305    
306        public boolean isUseLocalHost() {
307            return useLocalHost;
308        }
309    
310        /**
311         * Sets whether 'localhost' or the actual local host name should be used to
312         * make local connections. On some operating systems such as Macs its not
313         * possible to connect as the local host name so localhost is better.
314         */
315        public void setUseLocalHost(boolean useLocalHost) {
316            this.useLocalHost = useLocalHost;
317        }
318    
319        public int getSocketBufferSize() {
320            return socketBufferSize;
321        }
322    
323        /**
324         * Sets the buffer size to use on the socket
325         */
326        public void setSocketBufferSize(int socketBufferSize) {
327            this.socketBufferSize = socketBufferSize;
328        }
329    
330        public int getSoTimeout() {
331            return soTimeout;
332        }
333    
334        /**
335         * Sets the socket timeout
336         */
337        public void setSoTimeout(int soTimeout) {
338            this.soTimeout = soTimeout;
339        }
340    
341        public int getConnectionTimeout() {
342            return connectionTimeout;
343        }
344    
345        /**
346         * Sets the timeout used to connect to the socket
347         */
348        public void setConnectionTimeout(int connectionTimeout) {
349            this.connectionTimeout = connectionTimeout;
350        }
351    
352        public Boolean getKeepAlive() {
353            return keepAlive;
354        }
355    
356        /**
357         * Enable/disable TCP KEEP_ALIVE mode
358         */
359        public void setKeepAlive(Boolean keepAlive) {
360            this.keepAlive = keepAlive;
361        }
362    
363        /**
364         * Enable/disable soLinger
365         * @param soLinger enabled if > -1, disabled if == -1, system default otherwise
366         */
367        public void setSoLinger(int soLinger) {
368            this.soLinger = soLinger;
369        }
370    
371        public int getSoLinger() {
372            return soLinger;
373        }
374    
375        public Boolean getTcpNoDelay() {
376            return tcpNoDelay;
377        }
378    
379        /**
380         * Enable/disable the TCP_NODELAY option on the socket
381         */
382        public void setTcpNoDelay(Boolean tcpNoDelay) {
383            this.tcpNoDelay = tcpNoDelay;
384        }
385    
386        /**
387         * @return the ioBufferSize
388         */
389        public int getIoBufferSize() {
390            return this.ioBufferSize;
391        }
392    
393        /**
394         * @param ioBufferSize the ioBufferSize to set
395         */
396        public void setIoBufferSize(int ioBufferSize) {
397            this.ioBufferSize = ioBufferSize;
398        }
399    
400        /**
401         * @return the closeAsync
402         */
403        public boolean isCloseAsync() {
404            return closeAsync;
405        }
406    
407        /**
408         * @param closeAsync the closeAsync to set
409         */
410        public void setCloseAsync(boolean closeAsync) {
411            this.closeAsync = closeAsync;
412        }
413    
414        // Implementation methods
415        // -------------------------------------------------------------------------
416        protected String resolveHostName(String host) throws UnknownHostException {
417            if (isUseLocalHost()) {
418                String localName = InetAddressUtil.getLocalHostName();
419                if (localName != null && localName.equals(host)) {
420                    return "localhost";
421                }
422            }
423            return host;
424        }
425    
426        /**
427         * Configures the socket for use
428         *
429         * @param sock
430         * @throws SocketException, IllegalArgumentException if setting the options
431         *         on the socket failed.
432         */
433        protected void initialiseSocket(Socket sock) throws SocketException,
434                IllegalArgumentException {
435            if (socketOptions != null) {
436                IntrospectionSupport.setProperties(socket, socketOptions);
437            }
438    
439            try {
440                sock.setReceiveBufferSize(socketBufferSize);
441                sock.setSendBufferSize(socketBufferSize);
442            } catch (SocketException se) {
443                LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
444                LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
445            }
446            sock.setSoTimeout(soTimeout);
447    
448            if (keepAlive != null) {
449                sock.setKeepAlive(keepAlive.booleanValue());
450            }
451    
452            if (soLinger > -1) {
453                sock.setSoLinger(true, soLinger);
454            } else if (soLinger == -1) {
455                sock.setSoLinger(false, 0);
456            }
457            if (tcpNoDelay != null) {
458                sock.setTcpNoDelay(tcpNoDelay.booleanValue());
459            }
460            if (!this.trafficClassSet) {
461                this.trafficClassSet = setTrafficClass(sock);
462            }
463        }
464    
465        @Override
466        protected void doStart() throws Exception {
467            connect();
468            stoppedLatch.set(new CountDownLatch(1));
469            super.doStart();
470        }
471    
472        protected void connect() throws Exception {
473    
474            if (socket == null && socketFactory == null) {
475                throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
476            }
477    
478            InetSocketAddress localAddress = null;
479            InetSocketAddress remoteAddress = null;
480    
481            if (localLocation != null) {
482                localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
483                                                     localLocation.getPort());
484            }
485    
486            if (remoteLocation != null) {
487                String host = resolveHostName(remoteLocation.getHost());
488                remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
489            }
490            // Set the traffic class before the socket is connected when possible so
491            // that the connection packets are given the correct traffic class.
492            this.trafficClassSet = setTrafficClass(socket);
493    
494            if (socket != null) {
495    
496                if (localAddress != null) {
497                    socket.bind(localAddress);
498                }
499    
500                // If it's a server accepted socket.. we don't need to connect it
501                // to a remote address.
502                if (remoteAddress != null) {
503                    if (connectionTimeout >= 0) {
504                        socket.connect(remoteAddress, connectionTimeout);
505                    } else {
506                        socket.connect(remoteAddress);
507                    }
508                }
509    
510            } else {
511                // For SSL sockets.. you can't create an unconnected socket :(
512                // This means the timout option are not supported either.
513                if (localAddress != null) {
514                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
515                                                        localAddress.getAddress(), localAddress.getPort());
516                } else {
517                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
518                }
519            }
520    
521            initialiseSocket(socket);
522            initializeStreams();
523        }
524    
525        @Override
526        protected void doStop(ServiceStopper stopper) throws Exception {
527            if (LOG.isDebugEnabled()) {
528                LOG.debug("Stopping transport " + this);
529            }
530    
531            // Closing the streams flush the sockets before closing.. if the socket
532            // is hung.. then this hangs the close.
533            // closeStreams();
534            if (socket != null) {
535                if (closeAsync) {
536                    //closing the socket can hang also
537                    final CountDownLatch latch = new CountDownLatch(1);
538    
539                    // need a async task for this
540                    final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
541                    taskRunnerFactory.execute(new Runnable() {
542                        public void run() {
543                            LOG.trace("Closing socket {}", socket);
544                            try {
545                                socket.close();
546                                LOG.debug("Closed socket {}", socket);
547                            } catch (IOException e) {
548                                if (LOG.isDebugEnabled()) {
549                                    LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
550                                }
551                            } finally {
552                                latch.countDown();
553                            }
554                        }
555                    });
556    
557                    try {
558                        latch.await(1,TimeUnit.SECONDS);
559                    } catch (InterruptedException e) {
560                        Thread.currentThread().interrupt();
561                    } finally {
562                        taskRunnerFactory.shutdownNow();
563                    }
564    
565                } else {
566                    // close synchronously
567                    LOG.trace("Closing socket {}", socket);
568                    try {
569                        socket.close();
570                        LOG.debug("Closed socket {}", socket);
571                    } catch (IOException e) {
572                        if (LOG.isDebugEnabled()) {
573                            LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
574                        }
575                    }
576                }
577            }
578        }
579    
580        /**
581         * Override so that stop() blocks until the run thread is no longer running.
582         */
583        @Override
584        public void stop() throws Exception {
585            super.stop();
586            CountDownLatch countDownLatch = stoppedLatch.get();
587            if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
588                countDownLatch.await(1,TimeUnit.SECONDS);
589            }
590        }
591    
592        protected void initializeStreams() throws Exception {
593            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
594                @Override
595                public int read() throws IOException {
596                    receiveCounter++;
597                    return super.read();
598                }
599                @Override
600                public int read(byte[] b, int off, int len) throws IOException {
601                    receiveCounter++;
602                    return super.read(b, off, len);
603                }
604                @Override
605                public long skip(long n) throws IOException {
606                    receiveCounter++;
607                    return super.skip(n);
608                }
609                @Override
610                protected void fill() throws IOException {
611                    receiveCounter++;
612                    super.fill();
613                }
614            };
615            this.dataIn = new DataInputStream(buffIn);
616            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
617            this.dataOut = new DataOutputStream(outputStream);
618            this.buffOut = outputStream;
619        }
620    
621        protected void closeStreams() throws IOException {
622            if (dataOut != null) {
623                dataOut.close();
624            }
625            if (dataIn != null) {
626                dataIn.close();
627            }
628        }
629    
630        public void setSocketOptions(Map<String, Object> socketOptions) {
631            this.socketOptions = new HashMap<String, Object>(socketOptions);
632        }
633    
634        public String getRemoteAddress() {
635            if (socket != null) {
636                SocketAddress address = socket.getRemoteSocketAddress();
637                if (address instanceof InetSocketAddress) {
638                    return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort();
639                } else {
640                    return "" + socket.getRemoteSocketAddress();
641                }
642            }
643            return null;
644        }
645    
646        @Override
647        public <T> T narrow(Class<T> target) {
648            if (target == Socket.class) {
649                return target.cast(socket);
650            } else if ( target == TimeStampStream.class) {
651                return target.cast(buffOut);
652            }
653            return super.narrow(target);
654        }
655    
656        public int getReceiveCounter() {
657            return receiveCounter;
658        }
659    
660        /**
661         * @param sock The socket on which to set the Traffic Class.
662         * @return Whether or not the Traffic Class was set on the given socket.
663         * @throws SocketException if the system does not support setting the
664         *         Traffic Class.
665         * @throws IllegalArgumentException if both the Differentiated Services and
666         *         Type of Services transport options have been set on the same
667         *         connection.
668         */
669        private boolean setTrafficClass(Socket sock) throws SocketException,
670                IllegalArgumentException {
671            if (sock == null
672                || (!this.diffServChosen && !this.typeOfServiceChosen)) {
673                return false;
674            }
675            if (this.diffServChosen && this.typeOfServiceChosen) {
676                throw new IllegalArgumentException("Cannot set both the "
677                    + " Differentiated Services and Type of Services transport "
678                    + " options on the same connection.");
679            }
680    
681            sock.setTrafficClass(this.trafficClass);
682    
683            int resultTrafficClass = sock.getTrafficClass();
684            if (this.trafficClass != resultTrafficClass) {
685                // In the case where the user has specified the ECN bits (e.g. in
686                // Type of Service) but the system won't allow the ECN bits to be
687                // set or in the case where setting the traffic class failed for
688                // other reasons, emit a warning.
689                if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
690                        && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
691                    LOG.warn("Attempted to set the Traffic Class to "
692                        + this.trafficClass + " but the result Traffic Class was "
693                        + resultTrafficClass + ". Please check that your system "
694                        + "allows you to set the ECN bits (the first two bits).");
695                } else {
696                    LOG.warn("Attempted to set the Traffic Class to "
697                        + this.trafficClass + " but the result Traffic Class was "
698                        + resultTrafficClass + ". Please check that your system "
699                             + "supports java.net.setTrafficClass.");
700                }
701                return false;
702            }
703            // Reset the guards that prevent both the Differentiated Services
704            // option and the Type of Service option from being set on the same
705            // connection.
706            this.diffServChosen = false;
707            this.typeOfServiceChosen = false;
708            return true;
709        }
710    
711        public WireFormat getWireFormat() {
712            return wireFormat;
713        }
714    }