About TCP Input Operator

TCP Input Operator provides a method to let the operator accept concurrent TCP connections. It also allows to parse the data that is received using more than one delimiters.

This operator can be used for accepting data directly from various IOT devices, any other TCP clients such as agents running on multiple nodes etc.

This operator is available under DT Plus license.


The following port is available on the TCP Input operator:

Port Type Port Name Details
Output Port outputPort Emits tuples received on TCP connections.


TCP Input operator is tested with the following components:

  • Cloudera Hadoop version 5.8.2 along with Apache Hadoop version 2.6.0
  • Java version 1.7 and above
  • Netty


The following properties must be updated in the properties.xml file for this operator:

Property Name Description Example
queueSize Defines the queue size to hold messages before emitting tuples dt.operator.[OPNAME].prop.queueSize
maxFrameLength Specifies the maximum size of the decoded frames. dt.operator.[OPNAME].prop.maxFrameLength
port Defines the listening TCP port that is used by the server. dt.operator.[OPNAME].prop.port
threadCount Specifies the number of parallel threads. dt.operator.[OPNAME].prop.threadCount


The TCP Input operator can be partitioned using default partitioner provided by Apex. TCP Input operator can be dynamically partitioned when rules are stateless.

For partitioning, you must add the following property in the properties.xml file. For example, if you add TcpInputOperator with the name TcpInputOperatorEx in the DAG, then this property creates four partitions of the operator when the application starts.


Using the Operator

TCP Input operator starts TCP server on the defined port . When the operator is used with multiple partitions, partitions cannot be deployed on the same node. Hence you must use it with the correct anti-affinity rule for TCP Input operator.

The following code illustrates, how this operator can be used in an application:

TcpInputOperator tcpInput = dag.addOperator("TCPInput", TcpInputOperator.class);
AffinityRule tcpRule = new AffinityRule(AffinityRule.Type.ANTI_AFFINITY, DAG.Locality.NODE_LOCAL, false, "TCPInput", "TCPInput");