kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Something Something <mailinglist...@gmail.com>
Subject Re: Integrating Kafka with Stateful Spark Streaming
Date Wed, 04 Mar 2020 18:38:43 GMT
Yes, I have. No response from them. I thought someone in Kafka community
might know the answer. Thanks.

On Wed, Mar 4, 2020 at 9:49 AM Boyang Chen <reluctanthero104@gmail.com>
wrote:

> Hey there,
>
> have you already sought help from Spark community? Currently I don't think
> we could attribute the symptom to Kafka.
>
> Boyang
>
> On Wed, Mar 4, 2020 at 7:37 AM Something Something <
> mailinglists19@gmail.com>
> wrote:
>
> > Need help integrating Kafka with 'Stateful Spark Streaming' application.
> >
> > In a Stateful Spark Streaming application I am writing the 'OutputRow' in
> > the 'updateAcrossEvents' but I keep getting this error (*Required
> attribute
> > 'value' not found*) while it's trying to write to Kafka. I know from the
> > documentation that 'value' attribute needs to be set but how do I do that
> > in the 'Stateful Structured Streaming'? Where & how do I add this 'value'
> > attribute in the following code? *Note: I am using Spark 2.3.1*
> >
> > withEventTime
> >       .as[R00tJsonObject]
> >       .withWatermark("event_time", "5 minutes")
> >       .groupByKey(row => (row.value.Id <http://row.value.id/>,
> > row.value.time.toString, row.value.cId))
> >
> >
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
> >       .writeStream
> >       .format("kafka")
> >       .option("kafka.bootstrap.servers", "localhost:9092")
> >       .option("topic", "myTopic")
> >       .option("checkpointLocation", "/Users/username/checkpointLocation")
> >       .outputMode("update")
> >       .start()
> >       .awaitTermination()
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message