001    /**
002    gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import org.apache.activemq.Service;
020    import org.apache.activemq.TransportLoggerSupport;
021    import org.apache.activemq.thread.TaskRunnerFactory;
022    import org.apache.activemq.transport.Transport;
023    import org.apache.activemq.transport.TransportThreadSupport;
024    import org.apache.activemq.util.InetAddressUtil;
025    import org.apache.activemq.util.IntrospectionSupport;
026    import org.apache.activemq.util.ServiceStopper;
027    import org.apache.activemq.wireformat.WireFormat;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    import javax.net.SocketFactory;
032    import java.io.DataInputStream;
033    import java.io.DataOutputStream;
034    import java.io.IOException;
035    import java.io.InterruptedIOException;
036    import java.net.*;
037    import java.util.HashMap;
038    import java.util.Map;
039    import java.util.concurrent.CountDownLatch;
040    import java.util.concurrent.TimeUnit;
041    import java.util.concurrent.atomic.AtomicReference;
042    
043    /**
044     * An implementation of the {@link Transport} interface using raw tcp/ip
045     *
046     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
047     *
048     */
049    public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
050        private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
051        protected final URI remoteLocation;
052        protected final URI localLocation;
053        protected final WireFormat wireFormat;
054    
055        protected int connectionTimeout = 30000;
056        protected int soTimeout;
057        protected int socketBufferSize = 64 * 1024;
058        protected int ioBufferSize = 8 * 1024;
059        protected boolean closeAsync=true;
060        protected Socket socket;
061        protected DataOutputStream dataOut;
062        protected DataInputStream dataIn;
063        protected TimeStampStream buffOut = null;
064    
065        /**
066         * The Traffic Class to be set on the socket.
067         */
068        protected int trafficClass = 0;
069        /**
070         * Keeps track of attempts to set the Traffic Class on the socket.
071         */
072        private boolean trafficClassSet = false;
073        /**
074         * Prevents setting both the Differentiated Services and Type of Service
075         * transport options at the same time, since they share the same spot in
076         * the TCP/IP packet headers.
077         */
078        protected boolean diffServChosen = false;
079        protected boolean typeOfServiceChosen = false;
080        /**
081         * trace=true -> the Transport stack where this TcpTransport
082         * object will be, will have a TransportLogger layer
083         * trace=false -> the Transport stack where this TcpTransport
084         * object will be, will NOT have a TransportLogger layer, and therefore
085         * will never be able to print logging messages.
086         * This parameter is most probably set in Connection or TransportConnector URIs.
087         */
088        protected boolean trace = false;
089        /**
090         * Name of the LogWriter implementation to use.
091         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
092         * This parameter is most probably set in Connection or TransportConnector URIs.
093         */
094        protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
095        /**
096         * Specifies if the TransportLogger will be manageable by JMX or not.
097         * Also, as long as there is at least 1 TransportLogger which is manageable,
098         * a TransportLoggerControl MBean will me created.
099         */
100        protected boolean dynamicManagement = false;
101        /**
102         * startLogging=true -> the TransportLogger object of the Transport stack
103         * will initially write messages to the log.
104         * startLogging=false -> the TransportLogger object of the Transport stack
105         * will initially NOT write messages to the log.
106         * This parameter only has an effect if trace == true.
107         * This parameter is most probably set in Connection or TransportConnector URIs.
108         */
109        protected boolean startLogging = true;
110        /**
111         * Specifies the port that will be used by the JMX server to manage
112         * the TransportLoggers.
113         * This should only be set in an URI by a client (producer or consumer) since
114         * a broker will already create a JMX server.
115         * It is useful for people who test a broker and clients in the same machine
116         * and want to control both via JMX; a different port will be needed.
117         */
118        protected int jmxPort = 1099;
119        protected boolean useLocalHost = false;
120        protected int minmumWireFormatVersion;
121        protected SocketFactory socketFactory;
122        protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
123        protected volatile int receiveCounter;
124    
125        private Map<String, Object> socketOptions;
126        private int soLinger = Integer.MIN_VALUE;
127        private Boolean keepAlive;
128        private Boolean tcpNoDelay;
129        private Thread runnerThread;
130    
131        /**
132         * Connect to a remote Node - e.g. a Broker
133         *
134         * @param wireFormat
135         * @param socketFactory
136         * @param remoteLocation
137         * @param localLocation - e.g. local InetAddress and local port
138         * @throws IOException
139         * @throws UnknownHostException
140         */
141        public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
142                            URI localLocation) throws UnknownHostException, IOException {
143            this.wireFormat = wireFormat;
144            this.socketFactory = socketFactory;
145            try {
146                this.socket = socketFactory.createSocket();
147            } catch (SocketException e) {
148                this.socket = null;
149            }
150            this.remoteLocation = remoteLocation;
151            this.localLocation = localLocation;
152            setDaemon(false);
153        }
154    
155        /**
156         * Initialize from a server Socket
157         *
158         * @param wireFormat
159         * @param socket
160         * @throws IOException
161         */
162        public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
163            this.wireFormat = wireFormat;
164            this.socket = socket;
165            this.remoteLocation = null;
166            this.localLocation = null;
167            setDaemon(true);
168        }
169    
170        /**
171         * A one way asynchronous send
172         */
173        public void oneway(Object command) throws IOException {
174            checkStarted();
175            wireFormat.marshal(command, dataOut);
176            dataOut.flush();
177        }
178    
179        /**
180         * @return pretty print of 'this'
181         */
182        @Override
183        public String toString() {
184            return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort()
185                    : (localLocation != null ? localLocation : remoteLocation)) ;
186        }
187    
188        /**
189         * reads packets from a Socket
190         */
191        public void run() {
192            LOG.trace("TCP consumer thread for " + this + " starting");
193            this.runnerThread=Thread.currentThread();
194            try {
195                while (!isStopped()) {
196                    doRun();
197                }
198            } catch (IOException e) {
199                stoppedLatch.get().countDown();
200                onException(e);
201            } catch (Throwable e){
202                stoppedLatch.get().countDown();
203                IOException ioe=new IOException("Unexpected error occured: " + e);
204                ioe.initCause(e);
205                onException(ioe);
206            }finally {
207                stoppedLatch.get().countDown();
208            }
209        }
210    
211        protected void doRun() throws IOException {
212            try {
213                Object command = readCommand();
214                doConsume(command);
215            } catch (SocketTimeoutException e) {
216            } catch (InterruptedIOException e) {
217            }
218        }
219    
220        protected Object readCommand() throws IOException {
221            return wireFormat.unmarshal(dataIn);
222        }
223    
224        // Properties
225        // -------------------------------------------------------------------------
226        public String getDiffServ() {
227            // This is the value requested by the user by setting the Tcp Transport
228            // options. If the socket hasn't been created, then this value may not
229            // reflect the value returned by Socket.getTrafficClass().
230            return Integer.toString(this.trafficClass);
231        }
232    
233        public void setDiffServ(String diffServ) throws IllegalArgumentException {
234            this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
235            this.diffServChosen = true;
236        }
237    
238        public int getTypeOfService() {
239            // This is the value requested by the user by setting the Tcp Transport
240            // options. If the socket hasn't been created, then this value may not
241            // reflect the value returned by Socket.getTrafficClass().
242            return this.trafficClass;
243        }
244    
245        public void setTypeOfService(int typeOfService) {
246            this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
247            this.typeOfServiceChosen = true;
248        }
249    
250        public boolean isTrace() {
251            return trace;
252        }
253    
254        public void setTrace(boolean trace) {
255            this.trace = trace;
256        }
257    
258        public String getLogWriterName() {
259            return logWriterName;
260        }
261    
262        public void setLogWriterName(String logFormat) {
263            this.logWriterName = logFormat;
264        }
265    
266        public boolean isDynamicManagement() {
267            return dynamicManagement;
268        }
269    
270        public void setDynamicManagement(boolean useJmx) {
271            this.dynamicManagement = useJmx;
272        }
273    
274        public boolean isStartLogging() {
275            return startLogging;
276        }
277    
278        public void setStartLogging(boolean startLogging) {
279            this.startLogging = startLogging;
280        }
281    
282        public int getJmxPort() {
283            return jmxPort;
284        }
285    
286        public void setJmxPort(int jmxPort) {
287            this.jmxPort = jmxPort;
288        }
289    
290        public int getMinmumWireFormatVersion() {
291            return minmumWireFormatVersion;
292        }
293    
294        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
295            this.minmumWireFormatVersion = minmumWireFormatVersion;
296        }
297    
298        public boolean isUseLocalHost() {
299            return useLocalHost;
300        }
301    
302        /**
303         * Sets whether 'localhost' or the actual local host name should be used to
304         * make local connections. On some operating systems such as Macs its not
305         * possible to connect as the local host name so localhost is better.
306         */
307        public void setUseLocalHost(boolean useLocalHost) {
308            this.useLocalHost = useLocalHost;
309        }
310    
311        public int getSocketBufferSize() {
312            return socketBufferSize;
313        }
314    
315        /**
316         * Sets the buffer size to use on the socket
317         */
318        public void setSocketBufferSize(int socketBufferSize) {
319            this.socketBufferSize = socketBufferSize;
320        }
321    
322        public int getSoTimeout() {
323            return soTimeout;
324        }
325    
326        /**
327         * Sets the socket timeout
328         */
329        public void setSoTimeout(int soTimeout) {
330            this.soTimeout = soTimeout;
331        }
332    
333        public int getConnectionTimeout() {
334            return connectionTimeout;
335        }
336    
337        /**
338         * Sets the timeout used to connect to the socket
339         */
340        public void setConnectionTimeout(int connectionTimeout) {
341            this.connectionTimeout = connectionTimeout;
342        }
343    
344        public Boolean getKeepAlive() {
345            return keepAlive;
346        }
347    
348        /**
349         * Enable/disable TCP KEEP_ALIVE mode
350         */
351        public void setKeepAlive(Boolean keepAlive) {
352            this.keepAlive = keepAlive;
353        }
354    
355        /**
356         * Enable/disable soLinger
357         * @param soLinger enabled if > -1, disabled if == -1, system default otherwise
358         */
359        public void setSoLinger(int soLinger) {
360            this.soLinger = soLinger;
361        }
362    
363        public int getSoLinger() {
364            return soLinger;
365        }
366    
367        public Boolean getTcpNoDelay() {
368            return tcpNoDelay;
369        }
370    
371        /**
372         * Enable/disable the TCP_NODELAY option on the socket
373         */
374        public void setTcpNoDelay(Boolean tcpNoDelay) {
375            this.tcpNoDelay = tcpNoDelay;
376        }
377    
378        /**
379         * @return the ioBufferSize
380         */
381        public int getIoBufferSize() {
382            return this.ioBufferSize;
383        }
384    
385        /**
386         * @param ioBufferSize the ioBufferSize to set
387         */
388        public void setIoBufferSize(int ioBufferSize) {
389            this.ioBufferSize = ioBufferSize;
390        }
391    
392        /**
393         * @return the closeAsync
394         */
395        public boolean isCloseAsync() {
396            return closeAsync;
397        }
398    
399        /**
400         * @param closeAsync the closeAsync to set
401         */
402        public void setCloseAsync(boolean closeAsync) {
403            this.closeAsync = closeAsync;
404        }
405    
406        // Implementation methods
407        // -------------------------------------------------------------------------
408        protected String resolveHostName(String host) throws UnknownHostException {
409            if (isUseLocalHost()) {
410                String localName = InetAddressUtil.getLocalHostName();
411                if (localName != null && localName.equals(host)) {
412                    return "localhost";
413                }
414            }
415            return host;
416        }
417    
418        /**
419         * Configures the socket for use
420         *
421         * @param sock
422         * @throws SocketException, IllegalArgumentException if setting the options
423         *         on the socket failed.
424         */
425        protected void initialiseSocket(Socket sock) throws SocketException,
426                IllegalArgumentException {
427            if (socketOptions != null) {
428                IntrospectionSupport.setProperties(socket, socketOptions);
429            }
430    
431            try {
432                sock.setReceiveBufferSize(socketBufferSize);
433                sock.setSendBufferSize(socketBufferSize);
434            } catch (SocketException se) {
435                LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
436                LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
437            }
438            sock.setSoTimeout(soTimeout);
439    
440            if (keepAlive != null) {
441                sock.setKeepAlive(keepAlive.booleanValue());
442            }
443    
444            if (soLinger > -1) {
445                sock.setSoLinger(true, soLinger);
446            } else if (soLinger == -1) {
447                sock.setSoLinger(false, 0);
448            }
449            if (tcpNoDelay != null) {
450                sock.setTcpNoDelay(tcpNoDelay.booleanValue());
451            }
452            if (!this.trafficClassSet) {
453                this.trafficClassSet = setTrafficClass(sock);
454            }
455        }
456    
457        @Override
458        protected void doStart() throws Exception {
459            connect();
460            stoppedLatch.set(new CountDownLatch(1));
461            super.doStart();
462        }
463    
464        protected void connect() throws Exception {
465    
466            if (socket == null && socketFactory == null) {
467                throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
468            }
469    
470            InetSocketAddress localAddress = null;
471            InetSocketAddress remoteAddress = null;
472    
473            if (localLocation != null) {
474                localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
475                                                     localLocation.getPort());
476            }
477    
478            if (remoteLocation != null) {
479                String host = resolveHostName(remoteLocation.getHost());
480                remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
481            }
482            // Set the traffic class before the socket is connected when possible so
483            // that the connection packets are given the correct traffic class.
484            this.trafficClassSet = setTrafficClass(socket);
485    
486            if (socket != null) {
487    
488                if (localAddress != null) {
489                    socket.bind(localAddress);
490                }
491    
492                // If it's a server accepted socket.. we don't need to connect it
493                // to a remote address.
494                if (remoteAddress != null) {
495                    if (connectionTimeout >= 0) {
496                        socket.connect(remoteAddress, connectionTimeout);
497                    } else {
498                        socket.connect(remoteAddress);
499                    }
500                }
501    
502            } else {
503                // For SSL sockets.. you can't create an unconnected socket :(
504                // This means the timout option are not supported either.
505                if (localAddress != null) {
506                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
507                                                        localAddress.getAddress(), localAddress.getPort());
508                } else {
509                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
510                }
511            }
512    
513            initialiseSocket(socket);
514            initializeStreams();
515        }
516    
517        @Override
518        protected void doStop(ServiceStopper stopper) throws Exception {
519            if (LOG.isDebugEnabled()) {
520                LOG.debug("Stopping transport " + this);
521            }
522    
523            // Closing the streams flush the sockets before closing.. if the socket
524            // is hung.. then this hangs the close.
525            // closeStreams();
526            if (socket != null) {
527                if (closeAsync) {
528                    //closing the socket can hang also
529                    final CountDownLatch latch = new CountDownLatch(1);
530    
531                    // need a async task for this
532                    final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
533                    taskRunnerFactory.execute(new Runnable() {
534                        public void run() {
535                            LOG.trace("Closing socket {}", socket);
536                            try {
537                                socket.close();
538                                LOG.debug("Closed socket {}", socket);
539                            } catch (IOException e) {
540                                if (LOG.isDebugEnabled()) {
541                                    LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
542                                }
543                            } finally {
544                                latch.countDown();
545                            }
546                        }
547                    });
548    
549                    try {
550                        latch.await(1,TimeUnit.SECONDS);
551                    } catch (InterruptedException e) {
552                        Thread.currentThread().interrupt();
553                    } finally {
554                        taskRunnerFactory.shutdownNow();
555                    }
556    
557                } else {
558                    // close synchronously
559                    LOG.trace("Closing socket {}", socket);
560                    try {
561                        socket.close();
562                        LOG.debug("Closed socket {}", socket);
563                    } catch (IOException e) {
564                        if (LOG.isDebugEnabled()) {
565                            LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
566                        }
567                    }
568                }
569            }
570        }
571    
572        /**
573         * Override so that stop() blocks until the run thread is no longer running.
574         */
575        @Override
576        public void stop() throws Exception {
577            super.stop();
578            CountDownLatch countDownLatch = stoppedLatch.get();
579            if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
580                countDownLatch.await(1,TimeUnit.SECONDS);
581            }
582        }
583    
584        protected void initializeStreams() throws Exception {
585            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
586                @Override
587                public int read() throws IOException {
588                    receiveCounter++;
589                    return super.read();
590                }
591                @Override
592                public int read(byte[] b, int off, int len) throws IOException {
593                    receiveCounter++;
594                    return super.read(b, off, len);
595                }
596                @Override
597                public long skip(long n) throws IOException {
598                    receiveCounter++;
599                    return super.skip(n);
600                }
601                @Override
602                protected void fill() throws IOException {
603                    receiveCounter++;
604                    super.fill();
605                }
606            };
607            this.dataIn = new DataInputStream(buffIn);
608            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
609            this.dataOut = new DataOutputStream(outputStream);
610            this.buffOut = outputStream;
611        }
612    
613        protected void closeStreams() throws IOException {
614            if (dataOut != null) {
615                dataOut.close();
616            }
617            if (dataIn != null) {
618                dataIn.close();
619            }
620        }
621    
622        public void setSocketOptions(Map<String, Object> socketOptions) {
623            this.socketOptions = new HashMap<String, Object>(socketOptions);
624        }
625    
626        public String getRemoteAddress() {
627            if (socket != null) {
628                SocketAddress address = socket.getRemoteSocketAddress();
629                if (address instanceof InetSocketAddress) {
630                    return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort();
631                } else {
632                    return "" + socket.getRemoteSocketAddress();
633                }
634            }
635            return null;
636        }
637    
638        @Override
639        public <T> T narrow(Class<T> target) {
640            if (target == Socket.class) {
641                return target.cast(socket);
642            } else if ( target == TimeStampStream.class) {
643                return target.cast(buffOut);
644            }
645            return super.narrow(target);
646        }
647    
648        public int getReceiveCounter() {
649            return receiveCounter;
650        }
651    
652        /**
653         * @param sock The socket on which to set the Traffic Class.
654         * @return Whether or not the Traffic Class was set on the given socket.
655         * @throws SocketException if the system does not support setting the
656         *         Traffic Class.
657         * @throws IllegalArgumentException if both the Differentiated Services and
658         *         Type of Services transport options have been set on the same
659         *         connection.
660         */
661        private boolean setTrafficClass(Socket sock) throws SocketException,
662                IllegalArgumentException {
663            if (sock == null
664                || (!this.diffServChosen && !this.typeOfServiceChosen)) {
665                return false;
666            }
667            if (this.diffServChosen && this.typeOfServiceChosen) {
668                throw new IllegalArgumentException("Cannot set both the "
669                    + " Differentiated Services and Type of Services transport "
670                    + " options on the same connection.");
671            }
672    
673            sock.setTrafficClass(this.trafficClass);
674    
675            int resultTrafficClass = sock.getTrafficClass();
676            if (this.trafficClass != resultTrafficClass) {
677                // In the case where the user has specified the ECN bits (e.g. in
678                // Type of Service) but the system won't allow the ECN bits to be
679                // set or in the case where setting the traffic class failed for
680                // other reasons, emit a warning.
681                if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
682                        && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
683                    LOG.warn("Attempted to set the Traffic Class to "
684                        + this.trafficClass + " but the result Traffic Class was "
685                        + resultTrafficClass + ". Please check that your system "
686                        + "allows you to set the ECN bits (the first two bits).");
687                } else {
688                    LOG.warn("Attempted to set the Traffic Class to "
689                        + this.trafficClass + " but the result Traffic Class was "
690                        + resultTrafficClass + ". Please check that your system "
691                             + "supports java.net.setTrafficClass.");
692                }
693                return false;
694            }
695            // Reset the guards that prevent both the Differentiated Services
696            // option and the Type of Service option from being set on the same
697            // connection.
698            this.diffServChosen = false;
699            this.typeOfServiceChosen = false;
700            return true;
701        }
702    
703        public WireFormat getWireFormat() {
704            return wireFormat;
705        }
706    }