org.apache.activemq.transport.tcp
Class TcpTransport

java.lang.Object
  extended by org.apache.activemq.util.ServiceSupport
      extended by org.apache.activemq.transport.TransportSupport
          extended by org.apache.activemq.transport.TransportThreadSupport
              extended by org.apache.activemq.transport.tcp.TcpTransport
All Implemented Interfaces:
Runnable, Service, Transport
Direct Known Subclasses:
MQTTNIOTransport, NIOTransport, SslTransport, StompNIOTransport

public class TcpTransport
extends TransportThreadSupport
implements Transport, Service, Runnable

An implementation of the Transport interface using raw tcp/ip

Author:
David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)

Field Summary
protected  TimeStampStream buffOut
           
protected  boolean closeAsync
           
protected  int connectionTimeout
           
protected  DataInputStream dataIn
           
protected  DataOutputStream dataOut
           
protected  boolean diffServChosen
          Prevents setting both the Differentiated Services and Type of Service transport options at the same time, since they share the same spot in the TCP/IP packet headers.
protected  boolean dynamicManagement
          Specifies if the TransportLogger will be manageable by JMX or not.
protected  int ioBufferSize
           
protected  int jmxPort
          Specifies the port that will be used by the JMX server to manage the TransportLoggers.
protected  URI localLocation
           
protected  String logWriterName
          Name of the LogWriter implementation to use.
protected  int minmumWireFormatVersion
           
protected  URI remoteLocation
           
protected  Socket socket
           
protected  int socketBufferSize
           
protected  SocketFactory socketFactory
           
protected  int soTimeout
           
protected  boolean startLogging
          startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
protected  AtomicReference<CountDownLatch> stoppedLatch
           
protected  boolean trace
          trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, and therefore will never be able to print logging messages.
protected  int trafficClass
          The Traffic Class to be set on the socket.
protected  boolean typeOfServiceChosen
           
protected  boolean useLocalHost
           
protected  WireFormat wireFormat
           
 
Constructor Summary
TcpTransport(WireFormat wireFormat, Socket socket)
          Initialize from a server Socket
TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation)
          Connect to a remote Node - e.g.
 
Method Summary
protected  void closeStreams()
           
protected  void connect()
           
protected  void doRun()
           
protected  void doStart()
           
protected  void doStop(ServiceStopper stopper)
           
 int getConnectionTimeout()
           
 String getDiffServ()
           
 int getIoBufferSize()
           
 int getJmxPort()
           
 Boolean getKeepAlive()
           
 String getLogWriterName()
           
 int getMinmumWireFormatVersion()
           
 int getReceiveCounter()
          Returns a counter which gets incremented as data is read from the transport.
 String getRemoteAddress()
           
 int getSocketBufferSize()
           
 int getSoLinger()
           
 int getSoTimeout()
           
 Boolean getTcpNoDelay()
           
 int getTypeOfService()
           
 WireFormat getWireFormat()
           
protected  void initialiseSocket(Socket sock)
          Configures the socket for use
protected  void initializeStreams()
           
 boolean isCloseAsync()
           
 boolean isDynamicManagement()
           
 boolean isStartLogging()
           
 boolean isTrace()
           
 boolean isUseLocalHost()
           
<T> T
narrow(Class<T> target)
          narrow acceptance
 void oneway(Object command)
          A one way asynchronous send
protected  Object readCommand()
           
protected  String resolveHostName(String host)
           
 void run()
          reads packets from a Socket
 void setCloseAsync(boolean closeAsync)
           
 void setConnectionTimeout(int connectionTimeout)
          Sets the timeout used to connect to the socket
 void setDiffServ(String diffServ)
           
 void setDynamicManagement(boolean useJmx)
           
 void setIoBufferSize(int ioBufferSize)
           
 void setJmxPort(int jmxPort)
           
 void setKeepAlive(Boolean keepAlive)
          Enable/disable TCP KEEP_ALIVE mode
 void setLogWriterName(String logFormat)
           
 void setMinmumWireFormatVersion(int minmumWireFormatVersion)
           
 void setSocketBufferSize(int socketBufferSize)
          Sets the buffer size to use on the socket
 void setSocketOptions(Map<String,Object> socketOptions)
           
 void setSoLinger(int soLinger)
          Enable/disable soLinger
 void setSoTimeout(int soTimeout)
          Sets the socket timeout
 void setStartLogging(boolean startLogging)
           
 void setTcpNoDelay(Boolean tcpNoDelay)
          Enable/disable the TCP_NODELAY option on the socket
 void setTrace(boolean trace)
           
 void setTypeOfService(int typeOfService)
           
 void setUseLocalHost(boolean useLocalHost)
          Sets whether 'localhost' or the actual local host name should be used to make local connections.
 void stop()
          Override so that stop() blocks until the run thread is no longer running.
 String toString()
           
 
Methods inherited from class org.apache.activemq.transport.TransportThreadSupport
getStackSize, isDaemon, setDaemon, setStackSize
 
Methods inherited from class org.apache.activemq.transport.TransportSupport
asyncRequest, checkStarted, doConsume, getTransportListener, isConnected, isDisposed, isFaultTolerant, isReconnectSupported, isUpdateURIsSupported, onException, reconnect, request, request, setTransportListener, updateURIs
 
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.apache.activemq.transport.Transport
asyncRequest, getTransportListener, isConnected, isDisposed, isFaultTolerant, isReconnectSupported, isUpdateURIsSupported, reconnect, request, request, setTransportListener, updateURIs
 
Methods inherited from interface org.apache.activemq.Service
start
 

Field Detail

remoteLocation

protected final URI remoteLocation

localLocation

protected final URI localLocation

wireFormat

protected final WireFormat wireFormat

connectionTimeout

protected int connectionTimeout

soTimeout

protected int soTimeout

socketBufferSize

protected int socketBufferSize

ioBufferSize

protected int ioBufferSize

closeAsync

protected boolean closeAsync

socket

protected Socket socket

dataOut

protected DataOutputStream dataOut

dataIn

protected DataInputStream dataIn

buffOut

protected TimeStampStream buffOut

trafficClass

protected int trafficClass
The Traffic Class to be set on the socket.


diffServChosen

protected boolean diffServChosen
Prevents setting both the Differentiated Services and Type of Service transport options at the same time, since they share the same spot in the TCP/IP packet headers.


typeOfServiceChosen

protected boolean typeOfServiceChosen

trace

protected boolean trace
trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, and therefore will never be able to print logging messages. This parameter is most probably set in Connection or TransportConnector URIs.


logWriterName

protected String logWriterName
Name of the LogWriter implementation to use. Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably set in Connection or TransportConnector URIs.


dynamicManagement

protected boolean dynamicManagement
Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 TransportLogger which is manageable, a TransportLoggerControl MBean will me created.


startLogging

protected boolean startLogging
startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or TransportConnector URIs.


jmxPort

protected int jmxPort
Specifies the port that will be used by the JMX server to manage the TransportLoggers. This should only be set in an URI by a client (producer or consumer) since a broker will already create a JMX server. It is useful for people who test a broker and clients in the same machine and want to control both via JMX; a different port will be needed.


useLocalHost

protected boolean useLocalHost

minmumWireFormatVersion

protected int minmumWireFormatVersion

socketFactory

protected SocketFactory socketFactory

stoppedLatch

protected final AtomicReference<CountDownLatch> stoppedLatch
Constructor Detail

TcpTransport

public TcpTransport(WireFormat wireFormat,
                    SocketFactory socketFactory,
                    URI remoteLocation,
                    URI localLocation)
             throws UnknownHostException,
                    IOException
Connect to a remote Node - e.g. a Broker

Parameters:
wireFormat -
socketFactory -
remoteLocation -
localLocation - - e.g. local InetAddress and local port
Throws:
IOException
UnknownHostException

TcpTransport

public TcpTransport(WireFormat wireFormat,
                    Socket socket)
             throws IOException
Initialize from a server Socket

Parameters:
wireFormat -
socket -
Throws:
IOException
Method Detail

oneway

public void oneway(Object command)
            throws IOException
A one way asynchronous send

Specified by:
oneway in interface Transport
Throws:
IOException

toString

public String toString()
Overrides:
toString in class Object
Returns:
pretty print of 'this'

run

public void run()
reads packets from a Socket

Specified by:
run in interface Runnable

doRun

protected void doRun()
              throws IOException
Throws:
IOException

readCommand

protected Object readCommand()
                      throws IOException
Throws:
IOException

getDiffServ

public String getDiffServ()

setDiffServ

public void setDiffServ(String diffServ)
                 throws IllegalArgumentException
Throws:
IllegalArgumentException

getTypeOfService

public int getTypeOfService()

setTypeOfService

public void setTypeOfService(int typeOfService)

isTrace

public boolean isTrace()

setTrace

public void setTrace(boolean trace)

getLogWriterName

public String getLogWriterName()

setLogWriterName

public void setLogWriterName(String logFormat)

isDynamicManagement

public boolean isDynamicManagement()

setDynamicManagement

public void setDynamicManagement(boolean useJmx)

isStartLogging

public boolean isStartLogging()

setStartLogging

public void setStartLogging(boolean startLogging)

getJmxPort

public int getJmxPort()

setJmxPort

public void setJmxPort(int jmxPort)

getMinmumWireFormatVersion

public int getMinmumWireFormatVersion()

setMinmumWireFormatVersion

public void setMinmumWireFormatVersion(int minmumWireFormatVersion)

isUseLocalHost

public boolean isUseLocalHost()

setUseLocalHost

public void setUseLocalHost(boolean useLocalHost)
Sets whether 'localhost' or the actual local host name should be used to make local connections. On some operating systems such as Macs its not possible to connect as the local host name so localhost is better.


getSocketBufferSize

public int getSocketBufferSize()

setSocketBufferSize

public void setSocketBufferSize(int socketBufferSize)
Sets the buffer size to use on the socket


getSoTimeout

public int getSoTimeout()

setSoTimeout

public void setSoTimeout(int soTimeout)
Sets the socket timeout


getConnectionTimeout

public int getConnectionTimeout()

setConnectionTimeout

public void setConnectionTimeout(int connectionTimeout)
Sets the timeout used to connect to the socket


getKeepAlive

public Boolean getKeepAlive()

setKeepAlive

public void setKeepAlive(Boolean keepAlive)
Enable/disable TCP KEEP_ALIVE mode


setSoLinger

public void setSoLinger(int soLinger)
Enable/disable soLinger

Parameters:
soLinger - enabled if > -1, disabled if == -1, system default otherwise

getSoLinger

public int getSoLinger()

getTcpNoDelay

public Boolean getTcpNoDelay()

setTcpNoDelay

public void setTcpNoDelay(Boolean tcpNoDelay)
Enable/disable the TCP_NODELAY option on the socket


getIoBufferSize

public int getIoBufferSize()
Returns:
the ioBufferSize

setIoBufferSize

public void setIoBufferSize(int ioBufferSize)
Parameters:
ioBufferSize - the ioBufferSize to set

isCloseAsync

public boolean isCloseAsync()
Returns:
the closeAsync

setCloseAsync

public void setCloseAsync(boolean closeAsync)
Parameters:
closeAsync - the closeAsync to set

resolveHostName

protected String resolveHostName(String host)
                          throws UnknownHostException
Throws:
UnknownHostException

initialiseSocket

protected void initialiseSocket(Socket sock)
                         throws SocketException,
                                IllegalArgumentException
Configures the socket for use

Parameters:
sock -
Throws:
SocketException, - IllegalArgumentException if setting the options on the socket failed.
SocketException
IllegalArgumentException

doStart

protected void doStart()
                throws Exception
Overrides:
doStart in class TransportThreadSupport
Throws:
Exception

connect

protected void connect()
                throws Exception
Throws:
Exception

doStop

protected void doStop(ServiceStopper stopper)
               throws Exception
Specified by:
doStop in class ServiceSupport
Throws:
Exception

stop

public void stop()
          throws Exception
Override so that stop() blocks until the run thread is no longer running.

Specified by:
stop in interface Service
Overrides:
stop in class ServiceSupport
Throws:
Exception

initializeStreams

protected void initializeStreams()
                          throws Exception
Throws:
Exception

closeStreams

protected void closeStreams()
                     throws IOException
Throws:
IOException

setSocketOptions

public void setSocketOptions(Map<String,Object> socketOptions)

getRemoteAddress

public String getRemoteAddress()
Specified by:
getRemoteAddress in interface Transport
Returns:
the remote address for this connection

narrow

public <T> T narrow(Class<T> target)
Description copied from class: TransportSupport
narrow acceptance

Specified by:
narrow in interface Transport
Overrides:
narrow in class TransportSupport
Returns:
'this' if assignable

getReceiveCounter

public int getReceiveCounter()
Description copied from interface: Transport
Returns a counter which gets incremented as data is read from the transport. It should only be used to determine if there is progress being made in reading the next command from the transport. The value may wrap into the negative numbers.

Specified by:
getReceiveCounter in interface Transport
Returns:
a counter which gets incremented as data is read from the transport.

getWireFormat

public WireFormat getWireFormat()


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.