spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthias Niehoff <matthias.nieh...@codecentric.de>
Subject Re: Problems with new experimental Kafka Consumer for 0.10
Date Tue, 04 Oct 2016 07:18:45 GMT
Hi,
sry for the late reply. A public holiday in Germany.

Yes, its using a unique group id which no other job or consumer group is
using. I have increased the session.timeout to 1 minutes and set the
max.poll.rate to 1000. The processing takes ~1 second.

2016-09-29 4:41 GMT+02:00 Cody Koeninger <cody@koeninger.org>:

> Well, I'd start at the first thing suggested by the error, namely that
> the group has rebalanced.
>
> Is that stream using a unique group id?
>
> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
> <matthias.niehoff@codecentric.de> wrote:
> > Hi,
> >
> > the stacktrace:
> >
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be
> > completed since the group has already rebalanced and assigned the
> partitions
> > to another member. This means that the time between subsequent calls to
> > poll() was longer than the configured session.timeout.ms, which
> typically
> > implies that the poll loop is spending too much time message processing.
> You
> > can address this either by increasing the session timeout or by reducing
> the
> > maximum size of batches returned in poll() with max.poll.records.
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(
> RequestFuture.java:167)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(
> RequestFuture.java:133)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(
> RequestFuture.java:107)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> clientPoll(ConsumerNetworkClient.java:360)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:224)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:201)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:998)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> > at
> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:169)
> > at
> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:196)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > at
> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(
> DStream.scala:415)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> > at scala.Option.orElse(Option.scala:289)
> > at
> > org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
> > at
> > org.apache.spark.streaming.dstream.MapPartitionedDStream.
> compute(MapPartitionedDStream.scala:37)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > at
> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(
> DStream.scala:415)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> > at
> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> > at scala.Option.orElse(Option.scala:289)
> > at
> > org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
> > at
> > org.apache.spark.streaming.dstream.ForEachDStream.
> generateJob(ForEachDStream.scala:48)
> > at
> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:117)
> > at
> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:116)
> > at
> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
> > at
> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
> > at
> > scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:241)
> > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> > at
> > org.apache.spark.streaming.DStreamGraph.generateJobs(
> DStreamGraph.scala:116)
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:248)
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:246)
> > at scala.util.Try$.apply(Try.scala:192)
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator.
> generateJobs(JobGenerator.scala:246)
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:182)
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:87)
> > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> >
> > But it seems like the commit is not the actual problem. The job also
> falls
> > behind if I do not commit the offsets. The delay would be ok if the
> > processing time was bigger than the batch size, but thats not the case in
> > any of the microbatches. Imho for some reason one of the microbatches
> falls
> > behind more than session.timeout.ms. Then the consumer we regroup which
> > takes about 1 minute (see timestamps below). Know begins a circle of slow
> > batches each triggering a consumer regroup. Would this be possible?
> >
> >
> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for time
> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined group
> > spark_aggregation_job-kafka010 with generation 6
> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned
> > partitions [sapxm.adserving.log.ad_request-0,
> > sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1,
> > sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3,
> > sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5,
> > sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7,
> > sapxm.adserving.log.ad_request-9] for group
> spark_aggregation_job-kafka010
> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned
> > partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4,
> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group
> > spark_aggregation_job-kafka010
> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
> > spark_aggregation_job-kafka010
> >
> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
> >>
> >> What's the actual stacktrace / exception you're getting related to
> >> commit failure?
> >>
> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
> >> <matthias.niehoff@codecentric.de> wrote:
> >> > Hi everybody,
> >> >
> >> > i am using the new Kafka Receiver for Spark Streaming for my Job. When
> >> > running with old consumer it runs fine.
> >> >
> >> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the
> >> > topic,
> >> > calls mapWithState and stores the results in cassandra. After that I
> >> > manually commit the Kafka offsets using the commitAsync method of the
> >> > KafkaDStream.
> >> >
> >> > With the new consumer I experience the following problem:
> >> >
> >> > After a certain amount of time (about 4-5 minutes, might be more or
> >> > less)
> >> > there are exceptions that the offset commit failed. The processing
> takes
> >> > less than the batch interval. I also adjusted the session.timeout and
> >> > request.timeout as well as the max.poll.records setting which did not
> >> > help.
> >> >
> >> > After the first offset commit failed the time it takes from kafka
> until
> >> > the
> >> > microbatch is started increases, the processing time is constantly
> below
> >> > the
> >> > batch interval. Moreover further offset commits also fail and as
> result
> >> > the
> >> > delay time builds up.
> >> >
> >> > Has anybody made this experience as well?
> >> >
> >> > Thank you
> >> >
> >> > Relevant Kafka Parameters:
> >> >
> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
> >> > "auto.offset.reset" -> "largest",
> >> > "enable.auto.commit" -> "false",
> >> > "max.poll.records" -> "1000"
> >> >
> >> >
> >> >
> >> > --
> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> Consulting
> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
> (0)
> >> > 172.1702676
> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >> > www.more4fi.de
> >> >
> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> >> > Schütz
> >> >
> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >> > vertrauliche
> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
> richtige
> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
> >> > Sie
> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> >> > beigefügter
> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> >> > nicht
> >> > gestattet
> >
> >
> >
> >
> > --
> > Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> > 172.1702676
> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> > www.more4fi.de
> >
> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> Schütz
> >
> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> vertrauliche
> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
> Sie
> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter
> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht
> > gestattet
>



-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet

Mime
View raw message