spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "bharath kumar avusherla (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
Date Wed, 08 Aug 2018 18:52:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573700#comment-16573700
] 

bharath kumar avusherla edited comment on SPARK-23050 at 8/8/18 6:51 PM:
-------------------------------------------------------------------------

Is there any way we can avoid happening this?

We also recently observed the same issue when reading from Kafka topic and storing the output
to the S3 (and checkpointing in S3) using spark structured streaming 2.3.0.


was (Author: abharath9):
Is there any way we can avoid happening this?

We also recently observed the same issue when reading from Kafka topic and storing the output
to the S3 (and checkpointing in S3). And we are spark structured streaming 2.3.0.

> Structured Streaming with S3 file source duplicates data because of eventual consistency.
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-23050
>                 URL: https://issues.apache.org/jira/browse/SPARK-23050
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yash Sharma
>            Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the files have
been written to Filesystem. {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and fails the
task. {{org.apache.spark.SparkException: Task failed while writing rows. No such file or directory
's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this time. {{ManifestFileCommitProtocol.newTaskTempFile.
part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written to S3.
> - There is no data duplication if spark is able to list presence of all committed files
and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
>     .format("parquet") \
>     .option("compression", "snappy") \
>     .option("path", "s3://path/data/") \
>     .option("checkpointLocation", "s3://path/checkpoint/") \
>     .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00      17070 part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10      17070 part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:========================>                            (277 + 100) / 597]18/01/11
03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  org.apache.spark.SparkException:
Task failed while writing rows
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>  	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  	at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>  	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  	at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>  	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>  	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>  	at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>  	at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>  	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  	at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>  	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>  	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>  	... 8 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message