Hi:

I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the last few days, after running the application for 30-60 minutes get exception from Kafka Consumer included below.

The structured streaming application is processing 1 minute worth of data from kafka topic. So I've tried increasing request.timeout.ms from 40000 seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still get the same exception.

Is there any spark/kafka configuration that can save the offset and retry it next time rather than throwing an exception and killing the application.

I've tried googling but have not found substantial solution/recommendation.  If anyone has any suggestions or a different version etc, please let me know.

Thanks

Here is the exception stack trace.

java.util.concurrent.TimeoutException: Cannot fetch record for offset <offset#> in 120000 milliseconds
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at