spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!
Date Mon, 07 Aug 2017 18:58:27 GMT
Yes

On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
<deshpandeshyla@gmail.com> wrote:
> Thanks Cody again.
>
> No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
> the Cassandra table and saveToCassandra  is an action and my data do get
> saved into Cassandra. It is working as expected 99% of the time except that
> when there is an exception, I did not want the offsets to be committed.
>
> By Filtering for unsuccessful attempts, do you mean filtering the bad
> records...
>
>
>
>
>
>
> On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> If literally all you are doing is rdd.map I wouldn't expect
>> saveToCassandra to happen at all, since map is not an action.
>>
>> Filtering for unsuccessful attempts and collecting those back to the
>> driver would be one way for the driver to know whether it was safe to
>> commit.
>>
>> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>> <deshpandeshyla@gmail.com> wrote:
>> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
>> > running on executor
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
>> > running on driver.
>> >
>> > Is this the reason why kafka offsets are committed even when an
>> > exception is
>> > raised? If so is there a way to commit the offsets only when there are
>> > no
>> > exceptions?
>> >
>> >
>> >
>> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
>> > <deshpandeshyla@gmail.com>
>> > wrote:
>> >>
>> >> Thanks again Cody,
>> >>
>> >> My understanding is all the code inside foreachRDD is running on the
>> >> driver except for
>> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>> >>
>> >> When the exception is raised, I was thinking I won't be committing the
>> >> offsets, but the offsets are committed all the time independent of
>> >> whether
>> >> an exception was raised or not.
>> >>
>> >> It will be helpful if you can explain this behavior.
>> >>
>> >>
>> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <cody@koeninger.org>
>> >> wrote:
>> >>>
>> >>> I mean that the kafka consumers running on the executors should not
be
>> >>> automatically committing, because the fact that a message was read by
>> >>> the consumer has no bearing on whether it was actually successfully
>> >>> processed after reading.
>> >>>
>> >>> It sounds to me like you're confused about where code is running.
>> >>> foreachRDD runs on the driver, not the executor.
>> >>>
>> >>>
>> >>>
>> >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>> >>>
>> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>> >>> <deshpandeshyla@gmail.com> wrote:
>> >>> > Thanks Cody for your response.
>> >>> >
>> >>> > All I want to do is, commit the offsets only if I am successfully
>> >>> > able
>> >>> > to
>> >>> > write to cassandra database.
>> >>> >
>> >>> > The line //save the rdd to Cassandra database is
>> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >>> >
>> >>> > What do you mean by Executors shouldn't be auto-committing, that's
>> >>> > why
>> >>> > it's
>> >>> > being overridden. It is the executors that do the mapping and saving
>> >>> > to
>> >>> > cassandra. The status of success or failure of this operation is
>> >>> > known
>> >>> > only
>> >>> > on the executor and thats where I want to commit the kafka offsets.
>> >>> > If
>> >>> > this
>> >>> > is not what I sould be doing, then  what is the right way?
>> >>> >
>> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <cody@koeninger.org>
>> >>> > wrote:
>> >>> >>
>> >>> >> If your complaint is about offsets being committed that you
didn't
>> >>> >> expect... auto commit being false on executors shouldn't have
>> >>> >> anything
>> >>> >> to do with that.  Executors shouldn't be auto-committing, that's
>> >>> >> why
>> >>> >> it's being overridden.
>> >>> >>
>> >>> >> What you've said and the code you posted isn't really enough
to
>> >>> >> explain what your issue is, e.g.
>> >>> >>
>> >>> >> is this line
>> >>> >> // save the rdd to Cassandra database
>> >>> >> a blocking call
>> >>> >>
>> >>> >> are you sure that the rdd foreach isn't being retried and
>> >>> >> succeeding
>> >>> >> the second time around, etc
>> >>> >>
>> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >>> >> <deshpandeshyla@gmail.com> wrote:
>> >>> >> > Hello All,
>> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11
.
>> >>> >> >
>> >>> >> > I am setting enable.auto.commit to false, and manually
want to
>> >>> >> > commit
>> >>> >> > the
>> >>> >> > offsets after my output operation is successful. So when
a
>> >>> >> > exception
>> >>> >> > is
>> >>> >> > raised during during the processing I do not want the
offsets to
>> >>> >> > be
>> >>> >> > committed. But looks like the offsets are automatically
committed
>> >>> >> > even
>> >>> >> > when
>> >>> >> > the exception is raised and thereby I am losing data.
>> >>> >> > In my logs I see,  WARN  overriding enable.auto.commit
to false
>> >>> >> > for
>> >>> >> > executor.  But I don't want it to override. Please help.
>> >>> >> >
>> >>> >> > My code looks like..
>> >>> >> >
>> >>> >> >     val kafkaParams = Map[String, Object](
>> >>> >> >       "bootstrap.servers" -> brokers,
>> >>> >> >       "key.deserializer" -> classOf[StringDeserializer],
>> >>> >> >       "value.deserializer" -> classOf[StringDeserializer],
>> >>> >> >       "group.id" -> "Group1",
>> >>> >> >       "auto.offset.reset" -> offsetresetparameter,
>> >>> >> >       "enable.auto.commit" -> (false: java.lang.Boolean)
>> >>> >> >     )
>> >>> >> >
>> >>> >> >     val myTopics = Array("topic1")
>> >>> >> >     val stream1 = KafkaUtils.createDirectStream[String,
String](
>> >>> >> >       ssc,
>> >>> >> >       PreferConsistent,
>> >>> >> >       Subscribe[String, String](myTopics, kafkaParams)
>> >>> >> >     )
>> >>> >> >
>> >>> >> >     stream1.foreachRDD { (rdd, time) =>
>> >>> >> >         val offsetRanges =
>> >>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >>> >> >         try {
>> >>> >> >             //save the rdd to Cassandra database
>> >>> >> >
>> >>> >> >
>> >>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> >>> >> >         } catch {
>> >>> >> >           case ex: Exception => {
>> >>> >> >             println(ex.toString + "!!!!!! Bad Data, Unable
to
>> >>> >> > persist
>> >>> >> > into
>> >>> >> > table !!!!!" + errorOffsetRangesToString(offsetRanges))
>> >>> >> >           }
>> >>> >> >         }
>> >>> >> >     }
>> >>> >> >
>> >>> >> >     ssc.start()
>> >>> >> >     ssc.awaitTermination()
>> >>> >
>> >>> >
>> >>
>> >>
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message