spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Srikanth <srikanth...@gmail.com>
Subject Re: Reset auto.offset.reset in Kafka 0.10 integ
Date Wed, 07 Sep 2016 15:44:48 GMT
My retention is 1d which isn't terribly low. The problem is every time I
restart after retention expiry, I get this exception instead of honoring
auto.offset.reset.
It isn't a corner case where retention expired after driver created a
batch. Its easily reproducible and consistent.

On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <cody@koeninger.org> wrote:

> You don't want auto.offset.reset on executors, you want executors to
> do what the driver told them to do.  Otherwise you're going to get
> really horrible data inconsistency issues if the executors silently
> reset.
>
> If your retention is so low that retention gets expired in between
> when the driver created a batch with a given starting offset, and when
> an executor starts to process that batch, you're going to have
> problems.
>
> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth.ht@gmail.com> wrote:
> > This isn't a production setup. We kept retention low intentionally.
> > My original question was why I got the exception instead of it using
> > auto.offset.reset on restart?
> >
> >
> >
> >
> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <cody@koeninger.org>
> wrote:
> >>
> >> If you leave enable.auto.commit set to true, it will commit offsets to
> >> kafka, but you will get undefined delivery semantics.
> >>
> >> If you just want to restart from a fresh state, the easiest thing to
> >> do is use a new consumer group name.
> >>
> >> But if that keeps happening, you should look into why your retention
> >> is not sufficient.
> >>
> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth.ht@gmail.com> wrote:
> >> > You are right. I got confused as its all part of same log when running
> >> > from
> >> > IDE.
> >> > I was looking for a good guide to read to understand the this integ.
> >> >
> >> > I'm not managing offset on my own. I've not enabled checkpoint for my
> >> > tests.
> >> > I assumed offsets will be stored in kafka by default.
> >> >
> >> >     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> >         ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> > Array[Byte]](pattern, kafkaParams) )
> >> >
> >> >    * @param offsets: offsets to begin at on initial startup.  If no
> >> > offset
> >> > is given for a
> >> >    * TopicPartition, the committed offset (if applicable) or kafka
> param
> >> >    * auto.offset.reset will be used.
> >> >
> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> > enable.auto.commit = true
> >> > auto.offset.reset = latest
> >> >
> >> > Srikanth
> >> >
> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <cody@koeninger.org>
> >> > wrote:
> >> >>
> >> >> Seems like you're confused about the purpose of that line of code,
it
> >> >> applies to executors, not the driver. The driver is responsible for
> >> >> determining offsets.
> >> >>
> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
> >> >> store?
> >> >> Auto offset reset won't be used if there are stored offsets.
> >> >>
> >> >>
> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth.ht@gmail.com> wrote:
> >> >>>
> >> >>> Hi,
> >> >>>
> >> >>> Upon restarting my Spark Streaming app it is failing with error
> >> >>>
> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> >> >>> aborted
> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
> recent
> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets
> >> >>> out of
> >> >>> range with no configured reset policy for partitions:
> >> >>> {mt-event-2=1710706}
> >> >>>
> >> >>> It is correct that the last read offset was deleted by kafka due
to
> >> >>> retention period expiry.
> >> >>> I've set auto.offset.reset in my app but it is getting reset here
> >> >>>
> >> >>>
> >> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >> >>>
> >> >>> How to force it to restart in this case (fully aware of potential
> data
> >> >>> loss)?
> >> >>>
> >> >>> Srikanth
> >> >
> >> >
> >
> >
>

Mime
View raw message