spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthias Niehoff <matthias.nieh...@codecentric.de>
Subject Re: Problems with new experimental Kafka Consumer for 0.10
Date Mon, 17 Oct 2016 11:33:57 GMT
heartbeat.interval.ms default
group.max.session.timeout.ms default
session.timeout.ms 60000

default values as of kafka 0.10.

complete Kafka params:

val kafkaParams = Map[String, String](
  "bootstrap.servers" -> kafkaBrokers,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> "false",
  "key.deserializer" -> classOf[StringDeserializer].getName,
  "value.deserializer" -> classOf[BytesDeserializer].getName,
  "session.timeout.ms" -> s"${1 * 60 * 1000}",
  "request.timeout.ms" -> s"${2 * 60 * 1000}",
  "max.poll.records" -> "1000"
)


As pointed out, when using different groups for each DirectStream
everything is fine.

2016-10-15 2:42 GMT+02:00 Cody Koeninger <cody@koeninger.org>:

> For you or anyone else having issues with consumer rebalance, what are
> your settings for
>
> heartbeat.interval.ms
> session.timeout.ms
> group.max.session.timeout.ms
>
> relative to your batch time?
>
> On Tue, Oct 11, 2016 at 10:19 AM, static-max <flashacid@googlemail.com>
> wrote:
> > Hi,
> >
> > I run into the same exception
> > (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be
> > completed since the group has already rebalanced ...), but I only use one
> > stream.
> > I get the exceptions when trying to manually commit the offset to Kafka:
> >
> > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
> > CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream();
> > cco.commitAsync(offsets);
> >
> > I tried setting "max.poll.records" to 1000 but this did not help.
> >
> > Any idea what could be wrong?
> >
> > 2016-10-11 15:36 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
> >>
> >> The new underlying kafka consumer prefetches data and is generally
> heavier
> >> weight, so it is cached on executors.  Group id is part of the cache
> key. I
> >> assumed kafka users would use different group ids for consumers they
> wanted
> >> to be distinct, since otherwise would cause problems even with the
> normal
> >> kafka consumer,  but that appears to be a poor assumption.
> >>
> >> I'll figure out a way to make this more obvious.
> >>
> >>
> >> On Oct 11, 2016 8:19 AM, "Matthias Niehoff"
> >> <matthias.niehoff@codecentric.de> wrote:
> >>
> >> good point, I changed the group id to be unique for the separate streams
> >> and now it works. Thanks!
> >>
> >> Although changing this is ok for us, i am interested in the why :-) With
> >> the old connector this was not a problem nor is it afaik with the pure
> kafka
> >> consumer api
> >>
> >> 2016-10-11 14:30 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
> >>>
> >>> Just out of curiosity, have you tried using separate group ids for the
> >>> separate streams?
> >>>
> >>>
> >>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff"
> >>> <matthias.niehoff@codecentric.de> wrote:
> >>>>
> >>>> I stripped down the job to just consume the stream and print it,
> without
> >>>> avro deserialization. When I only consume one topic, everything is
> fine. As
> >>>> soon as I add a second stream it stucks after about 5 minutes. So I
> >>>> basically bails down to:
> >>>>
> >>>>
> >>>>   val kafkaParams = Map[String, String](
> >>>>     "bootstrap.servers" -> kafkaBrokers,
> >>>>     "group.id" -> group,
> >>>>     "key.deserializer" -> classOf[StringDeserializer].getName,
> >>>>     "value.deserializer" -> classOf[BytesDeserializer].getName,
> >>>>     "session.timeout.ms" -> s"${1 * 60 * 1000}",
> >>>>     "request.timeout.ms" -> s"${2 * 60 * 1000}",
> >>>>     "auto.offset.reset" -> "latest",
> >>>>     "enable.auto.commit" -> "false"
> >>>>   )
> >>>>
> >>>>   def main(args: Array[String]) {
> >>>>
> >>>>     def createStreamingContext(): StreamingContext = {
> >>>>
> >>>>       val sparkConf = new SparkConf(true)
> >>>>         .setAppName("Kafka Consumer Test")
> >>>>       sparkConf.setMaster("local[*]")
> >>>>
> >>>>       val ssc = new StreamingContext(sparkConf,
> >>>> Seconds(streaming_interval_seconds))
> >>>>
> >>>>       // AD REQUESTS
> >>>>       // ===========
> >>>>       val serializedAdRequestStream = createStream(ssc,
> topic_adrequest)
> >>>>       serializedAdRequestStream.map(record =>
> >>>> record.value().get()).print(10)
> >>>>
> >>>>       // VIEWS
> >>>>       // ======
> >>>>       val serializedViewStream = createStream(ssc, topic_view)
> >>>>       serializedViewStream.map(record => record.value().get()).print(
> 10)
> >>>>
> >>>> //      // CLICKS
> >>>> //      // ======
> >>>> //      val serializedClickStream = createStream(ssc, topic_click)
> >>>> //      serializedClickStream.map(record =>
> >>>> record.value().get()).print(10)
> >>>>
> >>>>       ssc
> >>>>     }
> >>>>
> >>>>     val streamingContext = createStreamingContext
> >>>>     streamingContext.start()
> >>>>     streamingContext.awaitTermination()
> >>>>   }
> >>>>
> >>>>
> >>>> And in the logs you see:
> >>>>
> >>>>
> >>>> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job
> >>>> 1476100944000 ms.2 from job set of time 1476100944000 ms
> >>>> 16/10/10 14:02:26 INFO JobScheduler: Total delay: 2,314 s for time
> >>>> 1476100944000 ms (execution: 0,698 s)
> >>>> 16/10/10 14:03:26 INFO JobScheduler: Added jobs for time 1476100946000
> >>>> ms
> >>>> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from
> >>>> persistence list
> >>>> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job
> >>>> 1476100946000 ms.0 from job set of time 1476100946000 ms
> >>>>
> >>>>
> >>>> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff
> >>>> <matthias.niehoff@codecentric.de>:
> >>>>>
> >>>>> This Job will fail after about 5 minutes:
> >>>>>
> >>>>>
> >>>>> object SparkJobMinimal {
> >>>>>
> >>>>>   //read Avro schemas
> >>>>>   var stream = getClass.getResourceAsStream("
> /avro/AdRequestLog.avsc")
> >>>>>   val avroSchemaAdRequest =
> >>>>> scala.io.Source.fromInputStream(stream).getLines.mkString
> >>>>>   stream.close
> >>>>>   stream =
> >>>>> getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
> >>>>>   val avroSchemaEvent =
> >>>>> scala.io.Source.fromInputStream(stream).getLines.mkString
> >>>>>   stream.close
> >>>>>
> >>>>>
> >>>>>   val kafkaBrokers = "broker-0.kafka.mesos:9092"
> >>>>>
> >>>>>   val topic_adrequest = "adserving.log.ad_request"
> >>>>>   val topic_view = "adserving.log.view"
> >>>>>   val topic_click = "adserving.log.click"
> >>>>>   val group = UUID.randomUUID().toString
> >>>>>   val streaming_interval_seconds = 2
> >>>>>
> >>>>>   val kafkaParams = Map[String, String](
> >>>>>     "bootstrap.servers" -> kafkaBrokers,
> >>>>>     "group.id" -> group,
> >>>>>     "key.deserializer" -> classOf[StringDeserializer].getName,
> >>>>>     "value.deserializer" -> classOf[BytesDeserializer].getName,
> >>>>>     "session.timeout.ms" -> s"${1 * 60 * 1000}",
> >>>>>     "request.timeout.ms" -> s"${2 * 60 * 1000}",
> >>>>>     "auto.offset.reset" -> "latest",
> >>>>>     "enable.auto.commit" -> "false"
> >>>>>   )
> >>>>>
> >>>>>   def main(args: Array[String]) {
> >>>>>
> >>>>>     def createStreamingContext(): StreamingContext = {
> >>>>>
> >>>>>       val sparkConf = new SparkConf(true)
> >>>>>         .setAppName("Kafka Consumer Test")
> >>>>>       sparkConf.setMaster("local[*]")
> >>>>>
> >>>>>
> >>>>>       val ssc = new StreamingContext(sparkConf,
> >>>>> Seconds(streaming_interval_seconds))
> >>>>>
> >>>>>       // AD REQUESTS
> >>>>>       // ===========
> >>>>>       val serializedAdRequestStream = createStream(ssc,
> >>>>> topic_adrequest)
> >>>>>
> >>>>>       val adRequestStream =
> >>>>> deserializeStream(serializedAdRequestStream, avroSchemaAdRequest,
> record =>
> >>>>> AdRequestLog(record)).cache()
> >>>>>       adRequestStream.print(10)
> >>>>>
> >>>>>       // VIEWS
> >>>>>       // ======
> >>>>>
> >>>>>       val serializedViewStream = createStream(ssc, topic_view)
> >>>>>       val viewStream = deserializeStream(serializedViewStream,
> >>>>> avroSchemaEvent, record => Event(record, EventType.View)).cache()
> >>>>>       viewStream.print(10)
> >>>>>
> >>>>>
> >>>>>       // CLICKS
> >>>>>       // ======
> >>>>>       val serializedClickStream = createStream(ssc, topic_click)
> >>>>>       val clickEventStream = deserializeStream(
> serializedClickStream,
> >>>>> avroSchemaEvent, record => Event(record, EventType.Click)).cache()
> >>>>>       clickEventStream.print(10)
> >>>>>
> >>>>>       ssc
> >>>>>     }
> >>>>>
> >>>>>     val streamingContext = createStreamingContext
> >>>>>     streamingContext.start()
> >>>>>     streamingContext.awaitTermination()
> >>>>>   }
> >>>>>
> >>>>>   def createStream(ssc: StreamingContext, topic: String):
> >>>>> InputDStream[ConsumerRecord[String, Bytes]] = {
> >>>>>     KafkaUtils.createDirectStream[String, Bytes](ssc,
> >>>>> LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[
> String,
> >>>>> Bytes](Set(topic), kafkaParams))
> >>>>>   }
> >>>>>
> >>>>>   def deserializeStream[EventType: ClassTag](
> serializedAdRequestStream:
> >>>>> InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String,
> >>>>> recordMapper: GenericRecord => EventType): DStream[EventType] = {
> >>>>>     serializedAdRequestStream.mapPartitions {
> >>>>>       iteratorOfMessages =>
> >>>>>         val schema: Schema = new Schema.Parser().parse(avroSchema)
> >>>>>         val recordInjection = GenericAvroCodecs.toBinary(schema)
> >>>>>         iteratorOfMessages.map(message => {
> >>>>>           recordInjection.invert(message.value().get())
> >>>>>
> >>>>> }).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map(
> recordMapper)
> >>>>>     }
> >>>>>   }
> >>>>> }
> >>>>>
> >>>>>
> >>>>> 2016-10-10 17:42 GMT+02:00 Matthias Niehoff
> >>>>> <matthias.niehoff@codecentric.de>:
> >>>>>>
> >>>>>> Yes, without commiting the data the consumer rebalances.
> >>>>>> The job consumes 3 streams process them. When consuming only one
> >>>>>> stream it runs fine. But when consuming three streams, even without
> joining
> >>>>>> them, just deserialize the payload and trigger an output action it
> fails.
> >>>>>>
> >>>>>> I will prepare code sample.
> >>>>>>
> >>>>>> 2016-10-07 3:35 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
> >>>>>>>
> >>>>>>> OK, so at this point, even without involving commitAsync, you're
> >>>>>>> seeing consumer rebalances after a particular batch takes longer
> than
> >>>>>>> the session timeout?
> >>>>>>>
> >>>>>>> Do you have a minimal code example you can share?
> >>>>>>>
> >>>>>>> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
> >>>>>>> <matthias.niehoff@codecentric.de> wrote:
> >>>>>>> > Hi,
> >>>>>>> > sry for the late reply. A public holiday in Germany.
> >>>>>>> >
> >>>>>>> > Yes, its using a unique group id which no other job or consumer
> >>>>>>> > group is
> >>>>>>> > using. I have increased the session.timeout to 1 minutes and set
> >>>>>>> > the
> >>>>>>> > max.poll.rate to 1000. The processing takes ~1 second.
> >>>>>>> >
> >>>>>>> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
> >>>>>>> >>
> >>>>>>> >> Well, I'd start at the first thing suggested by the error,
> namely
> >>>>>>> >> that
> >>>>>>> >> the group has rebalanced.
> >>>>>>> >>
> >>>>>>> >> Is that stream using a unique group id?
> >>>>>>> >>
> >>>>>>> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
> >>>>>>> >> <matthias.niehoff@codecentric.de> wrote:
> >>>>>>> >> > Hi,
> >>>>>>> >> >
> >>>>>>> >> > the stacktrace:
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.CommitFailedException:
> Commit
> >>>>>>> >> > cannot
> >>>>>>> >> > be
> >>>>>>> >> > completed since the group has already rebalanced and assigned
> >>>>>>> >> > the
> >>>>>>> >> > partitions
> >>>>>>> >> > to another member. This means that the time between subsequent
> >>>>>>> >> > calls to
> >>>>>>> >> > poll() was longer than the configured session.timeout.ms,
> which
> >>>>>>> >> > typically
> >>>>>>> >> > implies that the poll loop is spending too much time message
> >>>>>>> >> > processing.
> >>>>>>> >> > You
> >>>>>>> >> > can address this either by increasing the session timeout or
> by
> >>>>>>> >> > reducing
> >>>>>>> >> > the
> >>>>>>> >> > maximum size of batches returned in poll() with
> >>>>>>> >> > max.poll.records.
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:578)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:519)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:679)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:658)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture.complete(RequestFuture.java:107)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(
> ConsumerNetworkClient.java:426)
> >>>>>>> >> > at
> >>>>>>> >> > org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:278)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:998)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:169)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> compute(DirectKafkaInputDStream.scala:196)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >>>>>>> >> > at
> >>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:58)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >>>>>>> >> > at scala.Option.orElse(Option.scala:289)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.MapPartitionedDStream.
> compute(MapPartitionedDStream.scala:37)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >>>>>>> >> > at
> >>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:58)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >>>>>>> >> > at scala.Option.orElse(Option.scala:289)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.dstream.ForEachDStream.
> generateJob(ForEachDStream.scala:48)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:117)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:116)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> >>>>>>> >> > at
> >>>>>>> >> > scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> > scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:241)
> >>>>>>> >> > at
> >>>>>>> >> > scala.collection.AbstractTraversable.flatMap(
> Traversable.scala:104)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.DStreamGraph.generateJobs(
> DStreamGraph.scala:116)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:248)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:246)
> >>>>>>> >> > at scala.util.Try$.apply(Try.scala:192)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.
> generateJobs(JobGenerator.scala:246)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:182)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
> >>>>>>> >> > at
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:87)
> >>>>>>> >> > at
> >>>>>>> >> > org.apache.spark.util.EventLoop$$anon$1.run(
> EventLoop.scala:48)
> >>>>>>> >> >
> >>>>>>> >> > But it seems like the commit is not the actual problem. The
> job
> >>>>>>> >> > also
> >>>>>>> >> > falls
> >>>>>>> >> > behind if I do not commit the offsets. The delay would be ok
> if
> >>>>>>> >> > the
> >>>>>>> >> > processing time was bigger than the batch size, but thats not
> >>>>>>> >> > the case
> >>>>>>> >> > in
> >>>>>>> >> > any of the microbatches. Imho for some reason one of the
> >>>>>>> >> > microbatches
> >>>>>>> >> > falls
> >>>>>>> >> > behind more than session.timeout.ms. Then the consumer we
> >>>>>>> >> > regroup which
> >>>>>>> >> > takes about 1 minute (see timestamps below). Know begins a
> >>>>>>> >> > circle of
> >>>>>>> >> > slow
> >>>>>>> >> > batches each triggering a consumer regroup. Would this be
> >>>>>>> >> > possible?
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s
> for
> >>>>>>> >> > time
> >>>>>>> >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
> >>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully
> joined
> >>>>>>> >> > group
> >>>>>>> >> > spark_aggregation_job-kafka010 with generation 6
> >>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly
> >>>>>>> >> > assigned
> >>>>>>> >> > partitions [sapxm.adserving.log.ad_request-0,
> >>>>>>> >> > sapxm.adserving.log.ad_request-2,
> >>>>>>> >> > sapxm.adserving.log.ad_request-1,
> >>>>>>> >> > sapxm.adserving.log.ad_request-4,
> >>>>>>> >> > sapxm.adserving.log.ad_request-3,
> >>>>>>> >> > sapxm.adserving.log.ad_request-6,
> >>>>>>> >> > sapxm.adserving.log.ad_request-5,
> >>>>>>> >> > sapxm.adserving.log.ad_request-8,
> >>>>>>> >> > sapxm.adserving.log.ad_request-7,
> >>>>>>> >> > sapxm.adserving.log.ad_request-9] for group
> >>>>>>> >> > spark_aggregation_job-kafka010
> >>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking
> previously
> >>>>>>> >> > assigned
> >>>>>>> >> > partitions [sapxm.adserving.log.view-3,
> >>>>>>> >> > sapxm.adserving.log.view-4,
> >>>>>>> >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
> >>>>>>> >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
> >>>>>>> >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
> >>>>>>> >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for
> >>>>>>> >> > group
> >>>>>>> >> > spark_aggregation_job-kafka010
> >>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
> >>>>>>> >> > spark_aggregation_job-kafka010
> >>>>>>> >> >
> >>>>>>> >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <cody@koeninger.org
> >:
> >>>>>>> >> >>
> >>>>>>> >> >> What's the actual stacktrace / exception you're getting
> related
> >>>>>>> >> >> to
> >>>>>>> >> >> commit failure?
> >>>>>>> >> >>
> >>>>>>> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
> >>>>>>> >> >> <matthias.niehoff@codecentric.de> wrote:
> >>>>>>> >> >> > Hi everybody,
> >>>>>>> >> >> >
> >>>>>>> >> >> > i am using the new Kafka Receiver for Spark Streaming for
> my
> >>>>>>> >> >> > Job.
> >>>>>>> >> >> > When
> >>>>>>> >> >> > running with old consumer it runs fine.
> >>>>>>> >> >> >
> >>>>>>> >> >> > The Job consumes 3 Topics, saves the data to Cassandra,
> >>>>>>> >> >> > cogroups the
> >>>>>>> >> >> > topic,
> >>>>>>> >> >> > calls mapWithState and stores the results in cassandra.
> After
> >>>>>>> >> >> > that I
> >>>>>>> >> >> > manually commit the Kafka offsets using the commitAsync
> >>>>>>> >> >> > method of the
> >>>>>>> >> >> > KafkaDStream.
> >>>>>>> >> >> >
> >>>>>>> >> >> > With the new consumer I experience the following problem:
> >>>>>>> >> >> >
> >>>>>>> >> >> > After a certain amount of time (about 4-5 minutes, might be
> >>>>>>> >> >> > more or
> >>>>>>> >> >> > less)
> >>>>>>> >> >> > there are exceptions that the offset commit failed. The
> >>>>>>> >> >> > processing
> >>>>>>> >> >> > takes
> >>>>>>> >> >> > less than the batch interval. I also adjusted the
> >>>>>>> >> >> > session.timeout and
> >>>>>>> >> >> > request.timeout as well as the max.poll.records setting
> which
> >>>>>>> >> >> > did not
> >>>>>>> >> >> > help.
> >>>>>>> >> >> >
> >>>>>>> >> >> > After the first offset commit failed the time it takes from
> >>>>>>> >> >> > kafka
> >>>>>>> >> >> > until
> >>>>>>> >> >> > the
> >>>>>>> >> >> > microbatch is started increases, the processing time is
> >>>>>>> >> >> > constantly
> >>>>>>> >> >> > below
> >>>>>>> >> >> > the
> >>>>>>> >> >> > batch interval. Moreover further offset commits also fail
> and
> >>>>>>> >> >> > as
> >>>>>>> >> >> > result
> >>>>>>> >> >> > the
> >>>>>>> >> >> > delay time builds up.
> >>>>>>> >> >> >
> >>>>>>> >> >> > Has anybody made this experience as well?
> >>>>>>> >> >> >
> >>>>>>> >> >> > Thank you
> >>>>>>> >> >> >
> >>>>>>> >> >> > Relevant Kafka Parameters:
> >>>>>>> >> >> >
> >>>>>>> >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
> >>>>>>> >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
> >>>>>>> >> >> > "auto.offset.reset" -> "largest",
> >>>>>>> >> >> > "enable.auto.commit" -> "false",
> >>>>>>> >> >> > "max.poll.records" -> "1000"
> >>>>>>> >> >> >
> >>>>>>> >> >> >
> >>>>>>> >> >> >
> >>>>>>> >> >> > --
> >>>>>>> >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory
> |
> >>>>>>> >> >> > Consulting
> >>>>>>> >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe |
> >>>>>>> >> >> > Deutschland
> >>>>>>> >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 |
> >>>>>>> >> >> > mobil: +49
> >>>>>>> >> >> > (0)
> >>>>>>> >> >> > 172.1702676
> >>>>>>> >> >> > www.codecentric.de | blog.codecentric.de |
> >>>>>>> >> >> > www.meettheexperts.de |
> >>>>>>> >> >> > www.more4fi.de
> >>>>>>> >> >> >
> >>>>>>> >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht
> >>>>>>> >> >> > Wuppertal
> >>>>>>> >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer
> Vehns
> >>>>>>> >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus
> Jäger .
> >>>>>>> >> >> > Jürgen
> >>>>>>> >> >> > Schütz
> >>>>>>> >> >> >
> >>>>>>> >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien
> enthält
> >>>>>>> >> >> > vertrauliche
> >>>>>>> >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht
> >>>>>>> >> >> > der
> >>>>>>> >> >> > richtige
> >>>>>>> >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >>>>>>> >> >> > informieren
> >>>>>>> >> >> > Sie
> >>>>>>> >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und
> >>>>>>> >> >> > evtl.
> >>>>>>> >> >> > beigefügter
> >>>>>>> >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
> Öffnen
> >>>>>>> >> >> > evtl.
> >>>>>>> >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser
> >>>>>>> >> >> > E-Mail ist
> >>>>>>> >> >> > nicht
> >>>>>>> >> >> > gestattet
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> >
> >>>>>>> >> > --
> >>>>>>> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> >>>>>>> >> > Consulting
> >>>>>>> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >>>>>>> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 |
> mobil:
> >>>>>>> >> > +49 (0)
> >>>>>>> >> > 172.1702676
> >>>>>>> >> > www.codecentric.de | blog.codecentric.de |
> www.meettheexperts.de
> >>>>>>> >> > |
> >>>>>>> >> > www.more4fi.de
> >>>>>>> >> >
> >>>>>>> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht
> >>>>>>> >> > Wuppertal
> >>>>>>> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >>>>>>> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
> >>>>>>> >> > Jürgen
> >>>>>>> >> > Schütz
> >>>>>>> >> >
> >>>>>>> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >>>>>>> >> > vertrauliche
> >>>>>>> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht
> der
> >>>>>>> >> > richtige
> >>>>>>> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >>>>>>> >> > informieren
> >>>>>>> >> > Sie
> >>>>>>> >> > bitte sofort den Absender und löschen Sie diese E-Mail und
> evtl.
> >>>>>>> >> > beigefügter
> >>>>>>> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> >>>>>>> >> > evtl.
> >>>>>>> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser
> E-Mail
> >>>>>>> >> > ist
> >>>>>>> >> > nicht
> >>>>>>> >> > gestattet
> >>>>>>> >
> >>>>>>> >
> >>>>>>> >
> >>>>>>> >
> >>>>>>> > --
> >>>>>>> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> >>>>>>> > Consulting
> >>>>>>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >>>>>>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil:
> +49
> >>>>>>> > (0)
> >>>>>>> > 172.1702676
> >>>>>>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
> |
> >>>>>>> > www.more4fi.de
> >>>>>>> >
> >>>>>>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht
> Wuppertal
> >>>>>>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >>>>>>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
> >>>>>>> > Jürgen Schütz
> >>>>>>> >
> >>>>>>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >>>>>>> > vertrauliche
> >>>>>>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
> >>>>>>> > richtige
> >>>>>>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >>>>>>> > informieren Sie
> >>>>>>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> >>>>>>> > beigefügter
> >>>>>>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl.
> >>>>>>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail
> >>>>>>> > ist nicht
> >>>>>>> > gestattet
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> >>>>>> Consulting
> >>>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >>>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
> (0)
> >>>>>> 172.1702676
> >>>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >>>>>> www.more4fi.de
> >>>>>>
> >>>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >>>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >>>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> >>>>>> Schütz
> >>>>>>
> >>>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >>>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie
> nicht der
> >>>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >>>>>> informieren Sie bitte sofort den Absender und löschen Sie diese
> E-Mail und
> >>>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen
> oder
> >>>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe
> dieser
> >>>>>> E-Mail ist nicht gestattet
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> Consulting
> >>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
> (0)
> >>>>> 172.1702676
> >>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >>>>> www.more4fi.de
> >>>>>
> >>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> >>>>> Schütz
> >>>>>
> >>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie
> nicht der
> >>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >>>>> informieren Sie bitte sofort den Absender und löschen Sie diese
> E-Mail und
> >>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen
> oder
> >>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe
> dieser
> >>>>> E-Mail ist nicht gestattet
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> Consulting
> >>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
> (0)
> >>>> 172.1702676
> >>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >>>> www.more4fi.de
> >>>>
> >>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> >>>> Schütz
> >>>>
> >>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie
> nicht der
> >>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >>>> informieren Sie bitte sofort den Absender und löschen Sie diese
> E-Mail und
> >>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen
> oder
> >>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
> >>>> E-Mail ist nicht gestattet
> >>
> >>
> >>
> >>
> >> --
> >> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> >> 172.1702676
> >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >> www.more4fi.de
> >>
> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> >> Schütz
> >>
> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> vertrauliche
> >> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> >> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
> Sie
> >> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter
> >> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> >> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht
> >> gestattet
> >>
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet

Mime
View raw message