spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: [Spark streaming] No assigned partition error during seek
Date Thu, 30 Nov 2017 21:09:27 GMT
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <mevenkat@gmail.com> wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>       // work around KAFKA-3370 when reset is none
>       // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>       // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>       // So, poll, suppress the first exception, then seek
>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>       val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>       try {
>         consumer.poll(0)
>       } catch {
>         case x: NoOffsetForPartitionException if shouldSuppress =>
>           logWarning("Catching NoOffsetForPartitionException since " +
>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>       }
>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>           *consumer.seek(topicPartition, offset)*
>       }
>     }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>     Map<TopicPartition, Long> offsets = new HashMap<>();
>     List<TopicPartition> topicPartitions =
>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>             new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
>             .collect(Collectors.toList());
>     Map<TopicPartition, Long> earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
>     // pick committed offsets
>     for (TopicPartition topicAndPartition : topicPartitions) {
>       final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>       if (committed != null && committed.offset() > earliestOffset) {
>         logger
>             .warn(
>                 "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
>                 topicAndPartition, committed.offset());
>         offsets.put(topicAndPartition, committed.offset());
>       } else {
>         logger
>             .warn(
>                 "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
>                 topicAndPartition, earliestOffset);
>         offsets.put(topicAndPartition, earliestOffset);
>       }
>     }
>     return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message