kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Suresh Chidambaram <chida.sur...@gmail.com>
Subject Re: Previous State of a Kafka Message
Date Mon, 07 Dec 2020 17:12:42 GMT
Hi Andrew,

Thank you for the suggestion. Creating the custom Processor helped me
achieving the requirement. Thank you once again.

Thanks
C Suresh

On Sunday, December 6, 2020, Andrew Grant <andrewgrant243@gmail.com> wrote:

> Hi Suresh,
>
> It seems the keys are different for each message - I see 0, 1 and 2. When
> you say "This topic contains the same keys" do you mean the message
> contents contain the same "attr1" "attr2" "attr3" keys? If so, I think the
> first step would be to re-key the messages so the new key is
> "attr1:attr2:attr3" or something like that - the solutions I can think of
> require all messages that make up the merged message be sent to the same
> partition.
>
> Once that is done, I see two options.
>
> The first one would be to write an streams application that uses the
> Processor API - see
> https://docs.confluent.io/platform/current/streams/
> developer-guide/dsl-api.html#applying-processors-and-
> transformers-processor-api-integration.
> You could create a state store and use that to maintain the merged message.
> As each message comes in you update the store merged value in the state
> store.
>
> The second option I can think of would be to use the Kafka Streams DSL
> "groupByKey" operator - see
> https://docs.confluent.io/platform/current/streams/
> developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating.
> This lets you declaratively define an aggregation function that would do
> the merging. I'd suggest this option if possible as it's less code and you
> don't have to manage a state store yourself.
>
> Both solutions require all messages that make up the merged message be sent
> to the same partition, hence the suggestion to re-key mentioned above.
>
> It's not clear to me if you require only the "finalized" message be emitted
> downstream. In either case you could filter emitted messages so only the
> one with all the keys is passed to downstream operators.
>
> Hope that helps.
>
> Thanks,
> Andrew
>
> On Sun, Dec 6, 2020 at 9:39 AM Suresh Chidambaram <chida.suresh@gmail.com>
> wrote:
>
> > Hi,
> >
> > Greetings!
> >
> > My requirement is as below.
> >
> > I have Topic named  "sample-topic".  This topic contains the same keys(as
> > String) with multiple messages and the messages are in JSON format. I
> would
> > like to merge the JSON messages and produce a final JSON. In order to
> > achieve this, how to maintain the state of the previous message?
> >
> > Below are the input messages in the topic.
> >
> > Key : 0       Message: {    "attr1" : "value1", "attr2" : "value_01",
> > "attr3" :"value_3" }
> >
> > Key : 1       Message: {     "attr1": "value_x", "attr2": "value2" ,
> > "attr3" : "value_y"}
> >
> > Key : 2       Message: {     "attr1": "value_m", "attr2": null , "attr3"
> :
> > "value_o"}
> >
> >
> > Now, I need the resultant JSON as below.
> >
> > { "attr1": "value_m", "attr2": "value2" , "attr3" : "value_o"}
> >
> >
> > Could someone help me by letting me know the solution? Thank you.
> >
> > Thanks
> >
> > C Suresh
> >
>
>
> --
> Andrew Grant
> 8054482621
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message