kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sathya Murthy <smjava2...@gmail.com>
Subject kafka stream processor's process method
Date Sat, 09 Jan 2021 09:41:31 GMT
Hi  there
i m sathya,
i have below requirements in my project , please let me know how to
achieve this requirement.


These are my two kafka stream classes

1. MySource

2. MyProcessor

and Mysource class sends continues stream of data and retrieved in process
method of Myprocessor class.

My requirements are

1) When my each message is processed inside process method, I need to send
response back to MySource class.(either SUCCESS/FAILED)

2) When it unsuccessful like any exception thrown while invoking service
call (newApplication.service(value);)

The process method should stop consume any messages further to prevent data
loss.

could you please help me on this.

1) MySource class

Kstreambuilder .build ().addSource (READ_FROM_TOPIC,
Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages)

.addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)

2) MyProcessor class

Public class MyProcessor implements Processor<String,String>{

Public void process (String key,String value){

Try{

newApplication.service(value);

} catch (exception e){

}

}

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message