spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shing Hing Man <mat...@yahoo.com.INVALID>
Subject Spark 1.6.2 Concurrent append to a HDFS folder with different partition key
Date Sat, 24 Sep 2016 16:12:55 GMT

I am trying to prototype using a single instance SqlContext and use it toappend Dataframes,partition
by a field, to the same HDFS folder from multiple threads. (Each thread will work with a DataFrame
having different partition column value.)
I get the exception16/09/24 16:45:12 ERROR [ForkJoinPool-3-worker-13] InsertIntoHadoopFsRelation:
Aborting job.java.io.FileNotFoundException: File hdfs://localhost:9000/user/temp/person/_temporary/0/task_201609241645_0001_m_000000/country=UK-1
does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:644)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:92)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:702)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:698)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:698) at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

  Below is my code. 
object ConcurrentAppends {
 val outputDir = "hdfs://localhost:9000/user/temp/person"
 def main(args: Array[String]): Unit = { val sqlContext = { val config = new SparkConf().setAppName("test").setIfMissing("spark.master",
"local[*]") val sc = new SparkContext(config) new SQLContext(sc) }
 val futureA = Future(badAppend(sqlContext)) val futureB = Future(badAppend(sqlContext))

 val result: Future[Long] = for { countA <- futureA countB <- futureB } yield { countA
+ countB }
 val timeout = 60 second val count = Await.result(result, timeout)
 println("Count=" + count) }

 /** * Appends some rows to folder person. */ def badAppend(sqlContext: SQLContext): Long
= { println( s""" |sqlContext=${sqlContext.hashCode()} |thread=${Thread.currentThread().getName}
|""".stripMargin)
 val personsDF: DataFrame = persons(sqlContext)
 personsDF.write.partitionBy("country").mode(SaveMode.Append).save(outputDir)
 personsDF.count
 }

 /** * @return A dataframe of rows */ def persons(sqlContext: SQLContext, rowsPerCountry:
Int = 100): DataFrame = {
 val personSchema = StructType( Seq( StructField("name", StringType, nullable = true), StructField("age",
IntegerType, nullable = true), StructField("gender", StringType, nullable = true), StructField("country",
StringType, nullable = true) ) )
 val noOfCountry = 10
 val rows = for { countryIndex <- (0 until noOfCountry) recIndex <- (0 until rowsPerCountry)
 } yield (Row(s"foo-$recIndex", 10, "male", s"UK-$countryIndex"))
 val rdd = sqlContext.sparkContext.parallelize(rows) val personsDF = sqlContext.createDataFrame(rdd,
personSchema)
 personsDF
 }
}}-------------------------------  The above issue is mentioned in https://issues.apache.org/jira/browse/SPARK-10109which
is still open. 
One way to have concurrent append is to use some sort of sharding - so that different thread
writes to different folder and then each has its own temporary directory. 
It would be very much appreciated if someone would share a better solution.
Thanks in advance for any suggestions!
Shing

Mime
View raw message