spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shyla deshpande <>
Subject kafka settting, to false is being overridden and I lose data. please help!
Date Sat, 05 Aug 2017 22:10:16 GMT
Hello All,
I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .

I am setting to false, and manually want to commit
the offsets after my output operation is successful. So when a
exception is raised during during the processing I do not want the
offsets to be committed. But looks like the offsets are automatically
committed even when the exception is raised and thereby I am losing
In my logs I see,  WARN  overriding to false for
executor.  But I don't want it to override. Please help.

My code looks like..

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "" -> "Group1",
      "auto.offset.reset" -> offsetresetparameter,
      "" -> (false: java.lang.Boolean)

    val myTopics = Array("topic1")
    val stream1 = KafkaUtils.createDirectStream[String, String](
      Subscribe[String, String](myTopics, kafkaParams)

    stream1.foreachRDD { (rdd, time) =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        try {
            //save the rdd to Cassandra database

        } catch {
          case ex: Exception => {
            println(ex.toString + "!!!!!! Bad Data, Unable to persist
into table !!!!!" + errorOffsetRangesToString(offsetRanges))


View raw message