I have a Kafka Topic Tranfer_History in which I streamed a CSV file. Now I want to count the occurrence of each PARTY_ID. Then after I want to apply the transformation: if the count is less than 20 put it to the new topic CHURN and if greater than 20 put it to topic LOYAL
#I am using JAVA
public class FirstFilterer {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
/*input messages example
{"155555","11.11.2016 11:12}
{"155555","11.11.2016 13:12}
{"155556","11.11.2016 13:12}
result to be achived:
{"155555","2"}
{"155556","1"}
*/
builder.stream("test_topic_3")
// .map()
.groupByKey()
// .windowedBy(Window) // This may or may not be required
.count()
.toStream()
.map(
(key,count) -> new KeyValue<>(key.toString(),count)
)
.to("test_output_filtered_3");//this topic is empty after the run
I am new to Kafka don't know much plz help me out
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…