spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat <meven...@gmail.com>
Subject Re: [Spark streaming] No assigned partition error during seek
Date Fri, 01 Dec 2017 01:16:53 GMT
I notice that *'Do not* manually add dependencies on org.apache.kafka
artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has
the appropriate transitive dependencies already, and different versions may
be incompatible in hard to diagnose way' after your query.

Does this imply that we should not be adding kafka clients in our jars?.

Thanks
Venkat

On Fri, 1 Dec 2017 at 06:45 venkat <mevenkat@gmail.com> wrote:

> Yes I use latest Kafka clients 0.11 to determine beginning offsets without
> seek and also I use Kafka offsets commits externally.
> I dont find the spark async commit  useful for our needs.
>
> Thanks
> Venkat
>
> On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <cody@koeninger.org> wrote:
>
>> You mentioned 0.11 version; the latest version of org.apache.kafka
>> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
>> has an appropriate dependency.
>>
>> Are you manually depending on a different version of the kafka-clients
>> artifact?
>>
>> On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <mevenkat@gmail.com> wrote:
>> > Version: 2.2 with Kafka010
>> >
>> > Hi,
>> >
>> > We are running spark streaming on AWS and trying to process incoming
>> > messages on Kafka topics. All was well.
>> > Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
>> and
>> > Kafka 0.11 version of server.
>> >
>> > With this new version of software we are facing issues with regard to
>> 'No
>> > assignment to partition for a topic and it happens intermittently'. I
>> > construct four DStreams with different group.ids as suggested.
>> >
>> > The main source of code thats causing the issue is this one
>> >
>> > if (!toSeek.isEmpty) {
>> >       // work around KAFKA-3370 when reset is none
>> >       // poll will throw if no position, i.e. auto offset reset none
>> and no
>> > explicit position
>> >       // but cant seek to a position before poll, because poll is what
>> gets
>> > subscription partitions
>> >       // So, poll, suppress the first exception, then seek
>> >       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>> >       val shouldSuppress = aor != null &&
>> > aor.asInstanceOf[String].toUpperCase == "NONE"
>> >       try {
>> >         consumer.poll(0)
>> >       } catch {
>> >         case x: NoOffsetForPartitionException if shouldSuppress =>
>> >           logWarning("Catching NoOffsetForPartitionException since " +
>> >             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
>> > KAFKA-3370")
>> >       }
>> >       toSeek.asScala.foreach { case (topicPartition, offset) =>
>> >           *consumer.seek(topicPartition, offset)*
>> >       }
>> >     }
>> >
>> > At the start of the job, I also ensure we are supplying all required
>> offsets
>> > correctly
>> >
>> > private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>> >     Map<TopicPartition, Long> offsets = new HashMap<>();
>> >     List<TopicPartition> topicPartitions =
>> >         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>> >             new TopicPartition(partitionInfo.topic(),
>> > partitionInfo.partition()))
>> >             .collect(Collectors.toList());
>> >     Map<TopicPartition, Long> earliestOffsets =
>> > consumer.beginningOffsets(topicPartitions);
>> >     // pick committed offsets
>> >     for (TopicPartition topicAndPartition : topicPartitions) {
>> >       final OffsetAndMetadata committed =
>> > consumer.committed(topicAndPartition);
>> >       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>> >       if (committed != null && committed.offset() > earliestOffset)
{
>> >         logger
>> >             .warn(
>> >                 "Committed offset found for: {} offset:{} -> Hence
>> adding
>> > committed offset",
>> >                 topicAndPartition, committed.offset());
>> >         offsets.put(topicAndPartition, committed.offset());
>> >       } else {
>> >         logger
>> >             .warn(
>> >                 "New partition/stale offset found for: {} offset:{} ->
>> Hence
>> > adding earliest offset",
>> >                 topicAndPartition, earliestOffset);
>> >         offsets.put(topicAndPartition, earliestOffset);
>> >       }
>> >     }
>> >     return offsets;
>> >   }
>> >
>> > The actual stack trace:
>> >
>> > Caused by: java.lang.IllegalStateException: No current assignment for
>> > partition genericEvents-1
>> > 2017-11-23 10:35:24,677 -    at
>> >
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
>> > 2017-11-23 10:35:24,677 -    at
>> >
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
>> > 2017-11-23 10:35:24,677 -    at
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
>> > 2017-11-23 10:35:24,678 -    at
>> >
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
>> > 2017-11-23 10:35:24,678 -    at
>> >
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
>> > 2017-11-23 10:35:24,678 -    at
>> > scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> > 2017-11-23 10:35:24,678 -    at
>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> > 2017-11-23 10:35:24,678 -    at
>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> > 2017-11-23 10:35:24,678 -    at
>> > scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> > 2017-11-23 10:35:24,678 -    at
>> >
>> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>> > 2017-11-23 10:35:24,679 -    at
>> >
>> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>> > 2017-11-23 10:35:24,680 -    at
>> >
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>> > 2017-11-23 10:35:24,680 -    at
>> > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> > 2017-11-23 10:35:24,680 -    at
>> > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> > 2017-11-23 10:35:24,680 -    at
>> > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>> > 2017-11-23 10:35:24,680 -    at
>> >
>> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>> > 2017-11-23 10:35:24,680 -    at
>> >
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
>> > 2017-11-23 10:35:24,680 -    at
>> >
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
>> > 2017-11-23 10:35:24,681 -    at
>> >
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
>> > 2017-11-23 10:35:24,681 -    at
>> >
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>> > 2017-11-23 10:35:24,681 -    at
>> > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>> > 2017-11-23 10:35:24,681 -    at
>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >
>>
>

Mime
View raw message