spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Srikanth <srikanth...@gmail.com>
Subject Re: Reset auto.offset.reset in Kafka 0.10 integ
Date Wed, 07 Sep 2016 16:44:56 GMT
Yes that's right.

I understand this is a data loss. The restart doesn't have to be all that
silent. It requires us to set a flag. I thought auto.offset.reset is that
flag.
But there isn't much I can do at this point given that retention has
cleaned things up.
The app has to start. Let admins address the data loss on the side.

On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Just so I'm clear on what's happening...
>
> - you're running a job that auto-commits offsets to kafka.
> - you stop that job for longer than your retention
> - you start that job back up, and it errors because the last committed
> offset is no longer available
> - you think that instead of erroring, the job should silently restart
> based on the value of auto.offset.reset
>
> Is that accurate?
>
>
> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth.ht@gmail.com> wrote:
> > My retention is 1d which isn't terribly low. The problem is every time I
> > restart after retention expiry, I get this exception instead of honoring
> > auto.offset.reset.
> > It isn't a corner case where retention expired after driver created a
> batch.
> > Its easily reproducible and consistent.
> >
> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
> >>
> >> You don't want auto.offset.reset on executors, you want executors to
> >> do what the driver told them to do.  Otherwise you're going to get
> >> really horrible data inconsistency issues if the executors silently
> >> reset.
> >>
> >> If your retention is so low that retention gets expired in between
> >> when the driver created a batch with a given starting offset, and when
> >> an executor starts to process that batch, you're going to have
> >> problems.
> >>
> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth.ht@gmail.com> wrote:
> >> > This isn't a production setup. We kept retention low intentionally.
> >> > My original question was why I got the exception instead of it using
> >> > auto.offset.reset on restart?
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <cody@koeninger.org>
> >> > wrote:
> >> >>
> >> >> If you leave enable.auto.commit set to true, it will commit offsets
> to
> >> >> kafka, but you will get undefined delivery semantics.
> >> >>
> >> >> If you just want to restart from a fresh state, the easiest thing to
> >> >> do is use a new consumer group name.
> >> >>
> >> >> But if that keeps happening, you should look into why your retention
> >> >> is not sufficient.
> >> >>
> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth.ht@gmail.com>
> wrote:
> >> >> > You are right. I got confused as its all part of same log when
> >> >> > running
> >> >> > from
> >> >> > IDE.
> >> >> > I was looking for a good guide to read to understand the this
> integ.
> >> >> >
> >> >> > I'm not managing offset on my own. I've not enabled checkpoint
for
> my
> >> >> > tests.
> >> >> > I assumed offsets will be stored in kafka by default.
> >> >> >
> >> >> >     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> >> >         ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> >> > Array[Byte]](pattern, kafkaParams) )
> >> >> >
> >> >> >    * @param offsets: offsets to begin at on initial startup. 
If no
> >> >> > offset
> >> >> > is given for a
> >> >> >    * TopicPartition, the committed offset (if applicable) or kafka
> >> >> > param
> >> >> >    * auto.offset.reset will be used.
> >> >> >
> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> >> > enable.auto.commit = true
> >> >> > auto.offset.reset = latest
> >> >> >
> >> >> > Srikanth
> >> >> >
> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <cody@koeninger.org
> >
> >> >> > wrote:
> >> >> >>
> >> >> >> Seems like you're confused about the purpose of that line
of code,
> >> >> >> it
> >> >> >> applies to executors, not the driver. The driver is responsible
> for
> >> >> >> determining offsets.
> >> >> >>
> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your
own
> >> >> >> store?
> >> >> >> Auto offset reset won't be used if there are stored offsets.
> >> >> >>
> >> >> >>
> >> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth.ht@gmail.com>
wrote:
> >> >> >>>
> >> >> >>> Hi,
> >> >> >>>
> >> >> >>> Upon restarting my Spark Streaming app it is failing with
error
> >> >> >>>
> >> >> >>> Exception in thread "main" org.apache.spark.SparkException:
Job
> >> >> >>> aborted
> >> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times,
most
> >> >> >>> recent
> >> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> >> >> >>> Offsets
> >> >> >>> out of
> >> >> >>> range with no configured reset policy for partitions:
> >> >> >>> {mt-event-2=1710706}
> >> >> >>>
> >> >> >>> It is correct that the last read offset was deleted by
kafka due
> to
> >> >> >>> retention period expiry.
> >> >> >>> I've set auto.offset.reset in my app but it is getting
reset here
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >> >> >>>
> >> >> >>> How to force it to restart in this case (fully aware of
potential
> >> >> >>> data
> >> >> >>> loss)?
> >> >> >>>
> >> >> >>> Srikanth
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Mime
View raw message