kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Something Something <mailinglist...@gmail.com>
Subject Integrating Kafka with Stateful Spark Streaming
Date Wed, 04 Mar 2020 15:36:47 GMT
Need help integrating Kafka with 'Stateful Spark Streaming' application.

In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
in the 'Stateful Structured Streaming'? Where & how do I add this 'value'
attribute in the following code? *Note: I am using Spark 2.3.1*

withEventTime
      .as[R00tJsonObject]
      .withWatermark("event_time", "5 minutes")
      .groupByKey(row => (row.value.Id <http://row.value.id/>,
row.value.time.toString, row.value.cId))
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "myTopic")
      .option("checkpointLocation", "/Users/username/checkpointLocation")
      .outputMode("update")
      .start()
      .awaitTermination()

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message