HDFS to Kafka Sync App

Summary

This application reads lines from configured HDFS path and writes each line as a message in configured Apache Kafka topic. This document illustrates step by step guide to launch, configure, customize this application.The source code is available at: https://github.com/DataTorrent/app-templates/tree/master/hdfs-to-kafka-sync.

Please send feedback or feature requests to: feedback@datatorrent.com

Steps to launch application

  1. Click on the AppFactory tab from the top navigation bar. AppHub link from top navigation bar Page listing the applications available on AppFactory is displayed.
  2. Search for Kafka to see all applications related to Kafka.
  3. Click on import button for HDFS to Kafka Sync AppNotification is displayed on the top right corner after application package is successfully imported. App import Notification

  4. Click on the link in the notification which navigates to the page for this application package.

    App details page

    Detailed information about the application package like version, last modified time, and short description is available on this page. Click on launch button for HDFS to Kafka Sync application. In the confirmation modal, click the Configure button.

  5. The HDFS-to-Kafka-Sync application configuration page is displayed. The Required Properties section must be completed before the application can be launched.

    Launch dialogue

    For example, suppose we wish to process lines from all files in /user/appuser/input from source-cluster and send the output to kafka on kafka-server-node with topic test. Properties should be set as follows:

    name value
    Kafka Topic Name test
    Input Directory Or File Path On HDFS /user/appuser/input
    Kafka Producer Properties serializer.class=kafka.serializer.DefaultEncoder, producer.type=async, metadata.broker.list=kafka-server-node:9092

    Details about configuration options are available in Configuration options section.

  6. When you are finished inputting application configuration properties, click on the save button on the top right corner of the page to save the configuration.

  7. Click on the launch button at the top right right corner to launch the application. A notification will be displayed on the top right corner after the application is launched successfully and includes the Application ID which can be used to monitor this instance and find its logs. Application launch notification

  8. Click on the Monitor tab from the top navigation bar.

  9. A page listing all running applications is displayed. Search for current application based on name or application id or any other relevant field. Click on the application name or id to navigate to application instance details page. Apps monitor listing

  10. Application instance details page shows key metrics for monitoring the application status. The logical tab shows application DAG, Stram events, operator status based on logical operators, stream status, and a chart with key metrics. Logical tab

  11. Click on the physical tab to look at the status of physical instances of the operator, containers etc. Physical tab

Configuration options

Mandatory properties

End user must specify the values for these properties.

Property Description Type Example
dt.operator.kafkaOutput. prop.producerProperties Properties for Kafka producer Comma separated String serializer.class=kafka.serializer.DefaultEncoder, producer.type=async,
metadata.broker.list=kafka-server-node:9092
dt.operator.kafkaOutput .prop.topic Kafka topic for output records String test
dt.operator.recordReader
prop.files
HDFS path for input file or directory String
  • /user/appuser/input/directory1
  • /user/appuser/input/file2.log
  • hdfs://node1.corp1.com/user/appuser/input

Advanced properties

There are pre-saved configurations based on the application environment. Recommended settings for datatorrent sandbox edition are in sandbox-memory-conf.xml and for a cluster environment in cluster-memory-conf.xml.

Property Description Type Cluster default Sandbox default

dt.operator.recordReader.prop.minReaders

Minimum number of BlockReader partitions for parallel reading. int 1 1

dt.operator.recordReader.prop.maxReaders

Maximum number of BlockReader partitions for parallel reading. int 16 1

dt.operator.kafkaOutput.attr.PARTITIONER

Partitoning for Kafka output operator String See (1) See (2)
  1. Cluster default: com.datatorrent.common.partitioner.StatelessPartitioner:16
  2. Sandbox default: com.datatorrent.common.partitioner.StatelessPartitioner:1

You can override default values for advanced properties by specifying custom values for these properties in the step specify custom property step mentioned in steps to launch an application.

Steps to customize the application

  1. Make sure you have following utilities installed on your machine and available on PATH in environment variables

  2. Use following command to clone the examples repository:

    git clone git@github.com:DataTorrent/app-templates.git

  3. Change directory to examples/tutorials/hdfs-to-kafka-sync:

    cd examples/tutorials/hdfs-to-kafka-sync

  4. Import this maven project in your favorite IDE (e.g. eclipse).

  5. Change the source code as per your requirements. Some tips are given as commented blocks in Application.java for this project.

  6. Make respective changes in the test case and properties.xml based on your environment.

  7. Compile this project using maven: mvn clean package

    This will generate the application package with the .apa extension inside the target directory.

  8. Go to DataTorrent UI Management console on web browser. Click on the Develop tab from the top navigation bar.

  9. Click on Application Packages from the list.

  10. Click on upload package button and upload the generated .apa file. Upload

  11. Application package page is shown with the listing of all packages. Click on the Launch button for the uploaded application package. Follow the steps for launching an application.