spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: [Spark streaming] No assigned partition error during seek
Date Fri, 01 Dec 2017 15:54:53 GMT
Yeah, don't mix multiple versions of kafka clients.  That's not 100%
certain to be the cause of your problem, but it can't be helping.

As for your comments about async commits, read

https://issues.apache.org/jira/browse/SPARK-22486

and if you think your use case is still relevant to others given those
constraints, then share it.

On Fri, Dec 1, 2017 at 4:11 AM, Qiao, Richard
<Richard.Qiao@capitalone.com> wrote:
> In your case, it looks it’s trying to make 2 versions Kafka existed in the
> same JVM at runtime. There is version conflict.
>
>
>
> About “I dont find the spark async commit  useful for our needs”, do you
> mean to say the code like below?
>
> kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
>
>
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> From: venkat <mevenkat@gmail.com>
> Date: Thursday, November 30, 2017 at 8:16 PM
> To: Cody Koeninger <cody@koeninger.org>
> Cc: "user@spark.apache.org" <user@spark.apache.org>
> Subject: Re: [Spark streaming] No assigned partition error during seek
>
>
>
> I notice that 'Do not manually add dependencies on org.apache.kafka
> artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has
> the appropriate transitive dependencies already, and different versions may
> be incompatible in hard to diagnose way' after your query.
>
> Does this imply that we should not be adding kafka clients in our jars?.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 06:45 venkat <mevenkat@gmail.com> wrote:
>
> Yes I use latest Kafka clients 0.11 to determine beginning offsets without
> seek and also I use Kafka offsets commits externally.
>
> I dont find the spark async commit  useful for our needs.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <cody@koeninger.org> wrote:
>
> You mentioned 0.11 version; the latest version of org.apache.kafka
> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
> has an appropriate dependency.
>
> Are you manually depending on a different version of the kafka-clients
> artifact?
>
> On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <mevenkat@gmail.com> wrote:
>> Version: 2.2 with Kafka010
>>
>> Hi,
>>
>> We are running spark streaming on AWS and trying to process incoming
>> messages on Kafka topics. All was well.
>> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
>> and
>> Kafka 0.11 version of server.
>>
>> With this new version of software we are facing issues with regard to 'No
>> assignment to partition for a topic and it happens intermittently'. I
>> construct four DStreams with different group.ids as suggested.
>>
>> The main source of code thats causing the issue is this one
>>
>> if (!toSeek.isEmpty) {
>>       // work around KAFKA-3370 when reset is none
>>       // poll will throw if no position, i.e. auto offset reset none and
>> no
>> explicit position
>>       // but cant seek to a position before poll, because poll is what
>> gets
>> subscription partitions
>>       // So, poll, suppress the first exception, then seek
>>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>>       val shouldSuppress = aor != null &&
>> aor.asInstanceOf[String].toUpperCase == "NONE"
>>       try {
>>         consumer.poll(0)
>>       } catch {
>>         case x: NoOffsetForPartitionException if shouldSuppress =>
>>           logWarning("Catching NoOffsetForPartitionException since " +
>>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
>> KAFKA-3370")
>>       }
>>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>>           *consumer.seek(topicPartition, offset)*
>>       }
>>     }
>>
>> At the start of the job, I also ensure we are supplying all required
>> offsets
>> correctly
>>
>> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>>     Map<TopicPartition, Long> offsets = new HashMap<>();
>>     List<TopicPartition> topicPartitions =
>>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>>             new TopicPartition(partitionInfo.topic(),
>> partitionInfo.partition()))
>>             .collect(Collectors.toList());
>>     Map<TopicPartition, Long> earliestOffsets =
>> consumer.beginningOffsets(topicPartitions);
>>     // pick committed offsets
>>     for (TopicPartition topicAndPartition : topicPartitions) {
>>       final OffsetAndMetadata committed =
>> consumer.committed(topicAndPartition);
>>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>>       if (committed != null && committed.offset() > earliestOffset) {
>>         logger
>>             .warn(
>>                 "Committed offset found for: {} offset:{} -> Hence adding
>> committed offset",
>>                 topicAndPartition, committed.offset());
>>         offsets.put(topicAndPartition, committed.offset());
>>       } else {
>>         logger
>>             .warn(
>>                 "New partition/stale offset found for: {} offset:{} ->
>> Hence
>> adding earliest offset",
>>                 topicAndPartition, earliestOffset);
>>         offsets.put(topicAndPartition, earliestOffset);
>>       }
>>     }
>>     return offsets;
>>   }
>>
>> The actual stack trace:
>>
>> Caused by: java.lang.IllegalStateException: No current assignment for
>> partition genericEvents-1
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
>> 2017-11-23 10:35:24,681 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
>> 2017-11-23 10:35:24,681 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>> 2017-11-23 10:35:24,681 -    at
>> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>> 2017-11-23 10:35:24,681 -    at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message