spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cheng Lian <lian.cs....@gmail.com>
Subject Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions
Date Wed, 17 Jun 2015 10:47:55 GMT
Since SPARK-8406 is serious, we hope to ship it ASAP, possibly next 
week, but I can't say it's a promise yet. However, you can cherry pick 
the commit as soon as the fix is merged into branch-1.4. Sorry for the 
troubles!

Cheng

On 6/17/15 1:42 AM, Nathan McCarthy wrote:
> Thanks Cheng. Nice find!
>
> Let me know if there is anything we can do to help on this end with 
> contributing a fix or testing.
>
> Side note - any ideas on the 1.4.1 eta? There are a few bug fixes we 
> need in there.
>
> Cheers,
> Nathan
>
> From: Cheng Lian
> Date: Wednesday, 17 June 2015 6:25 pm
> To: Nathan, "user@spark.apache.org <mailto:user@spark.apache.org>"
> Subject: Re: Spark 1.4 DataFrame Parquet file writing - missing random 
> rows/partitions
>
> Hi Nathan,
>
> Thanks a lot for the detailed report, especially the information about 
> nonconsecutive part numbers. It's confirmed to be a race condition bug 
> and just filed https://issues.apache.org/jira/browse/SPARK-8406 to 
> track this. Will deliver a fix ASAP and this will be included in 1.4.1.
>
> Best,
> Cheng
>
> On 6/16/15 12:30 AM, Nathan McCarthy wrote:
>> Hi all,
>>
>> Looks like data frame parquet writing is very broken in Spark 1.4.0. 
>> We had no problems with Spark 1.3.
>>
>> When trying to save a data frame with *569610608* rows.
>>
>> dfc.write.format("parquet").save(“/data/map_parquet_file")
>>
>> We get random results between runs. Caching the data frame in memory 
>> makes no difference. It looks like the write out misses some of the 
>> RDD partitions. We have an RDD with *6750* partitions. When we write 
>> out we get less files out than the number of partitions. When reading 
>> the data back in and running a count, we get smaller number of rows.
>>
>> I’ve tried counting the rows in all different ways. All return the 
>> same result, *560214031* rows, missing about 9.4 million rows (0.15%).
>>
>> qc.read.parquet("/data/map_parquet_file").count
>> qc.read.parquet("/data/map_parquet_file").rdd.count
>> qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c 
>> = 0; itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)
>>
>> Looking on HDFS the files, there are /6643/ .parquet files. 107 
>> missing partitions (about 0.15%).
>>
>> Then writing out the same cached DF again to a new file gives *6717* 
>> files on hdfs (about 33 files missing or 0.5%);
>>
>> dfc.write.parquet(“/data/map_parquet_file_2")
>>
>> And we get *566670107* rows back (about 3million missing ~0.5%);
>>
>> qc.read.parquet("/data/map_parquet_file_2").count
>>
>> Writing the same df out to json writes the expected number (*6750*) 
>> of parquet files and returns the right number of rows /569610608/.
>>
>> dfc.write.format("json").save("/data/map_parquet_file_3")
>> qc.read.format("json").load("/data/map_parquet_file_3").count
>>
>> One thing to note is that the parquet part files on HDFS are not the 
>> normal sequential part numbers like for the json output and parquet 
>> output in Spark 1.3.
>>
>> part-r-06151.gz.parquet  part-r-118401.gz.parquet 
>>  part-r-146249.gz.parquet  part-r-196755.gz.parquet 
>>  part-r-35811.gz.parquet part-r-55628.gz.parquet 
>>  part-r-73497.gz.parquet  part-r-97237.gz.parquet
>> part-r-06161.gz.parquet  part-r-118406.gz.parquet 
>>  part-r-146254.gz.parquet  part-r-196763.gz.parquet 
>>  part-r-35826.gz.parquet part-r-55647.gz.parquet 
>>  part-r-73500.gz.parquet  _SUCCESS
>>
>> We are using MapR 4.0.2 for hdfs.
>>
>> Any ideas?
>>
>> Cheers,
>> Nathan
>>
>


Mime
View raw message