Kafka to Kafka Filter Transform Application

Summary

This application demonstrates continuous ingestion of streaming data source into the big data lake. Application uses Kafka as a streaming source and Kafka as big data lake destination. Depending on the context, data in kafka could be coming from logs of different events, sensor data, call details records, transactions etc.

Required Properties

End user must specify the values for these properties.

Property Type Example Notes
Filter Condition For Tuples String ({$}.getPremium() >= 50000) Quasi java expression
Kafka Output Topic Name String output_transactions Output topic name on Kakfa
Json Parser Field Info String {"policyNumber":"LONG", "customerName":"STRING", "premium":"LONG"} JSON map with key indicating input field. Value indicating data type for the field.
Kafka Broker List String
  • localhost:9092
  • node1.corp1.com:9092, node2.corp1.com:9092
Comma seperated list of kafka brokers
Kafka Input Topic Name String transactions Topic name on Kakfa
Kafka Producer Properties String serializer.class =kafka.serializer. StringEncoder ,producer.type=async ,metadata.broker.list=localhost:9092 Producer properties for Kafka output
Transform Expression Info String {"customerName":"{$} .getCustomerName() .toUpperCase()"} JSON map with key indicating output field. Value indicating expression to be used for calculating its value
Transform Output field Info String {"policyNumber":"LONG", "customerName":"STRING", "premium":"LONG"} JSON map with key indicating output field. Value indicating data type for the field.

Advanced Properties (optional)

Property Default Type Example Notes
Initial Offset Of Topic For Kafka Consumer LATEST String
  • EARLIEST
  • LATEST
  • APPLICATION_OR_EARLIEST
  • APPLICATION_OR_LATEST
Whether to read from beginning or read from current offset.
Number Of Partitions For Kafka Consumer com.datatorrent.common .partitioner.StatelessPartitioner:1 String com.datatorrent.common .partitioner.StatelessPartitioner:16 Parallel instances for Kafka operator