spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jungtaek Lim (Jira)" <>
Subject [jira] [Commented] (SPARK-30208) A race condition when reading from Kafka in PySpark
Date Thu, 12 Dec 2019 06:41:00 GMT


Jungtaek Lim commented on SPARK-30208:

Please correct me if I'm missing anything here.

I'm not looking into details of SPARK-22340, but the sentence here ("reading from Kafka is
actually happening in a separate writer thread rather that the task thread") seems to be still
valid with SPARK-22340.

Please refer KafkaSourceRDD; when we release Kafka consumer, we assume that there's no further
usage of consumer, as iterator has been exhausted or task completion listener has been called.
PythonRunner may break the assumption as it also interrupts the writer thread upon task completion
callback. (It doesn't even join the writer thread which might be another possible problem,
but let's assume interruption takes effect immediately for now, for simplicity.)

So depending on the orders of registration, callback being registered in KafkaSourceRDD may
be called earlier than callback being registered in PythonRunner, and then task thread will
try to release consumer where writer thread in PythonRunner still doesn't indicate whether
task is completed. 


> A race condition when reading from Kafka in PySpark
> ---------------------------------------------------
>                 Key: SPARK-30208
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: Jiawen Zhu
>            Priority: Major
> When using PySpark to read from Kafka, there is a race condition that Spark may use KafkaConsumer
in multiple threads at the same time and throw the following error:
> {code}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
>         at
>         at
>         at
>         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451)
>         at org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508)
>         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126)
>         at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
>         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131)
>         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130)
>         at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162)
>         at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
>         at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
>         at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144)
>         at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142)
>         at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130)
>         at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155)
>         at
>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
>         at org.apache.spark.executor.Executor$
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
>         at java.util.concurrent.ThreadPoolExecutor$
>         at
> {code}
> When using PySpark, reading from Kafka is actually happening in a separate writer thread
rather that the task thread.  When a task is early terminated (e.g., there is a limit operator),
the task thread may stop the KafkaConsumer when the writer thread is using it.

This message was sent by Atlassian Jira

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message