spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Cheung <felixcheun...@hotmail.com>
Subject Re: Dataset schema incompatibility bug when reading column partitioned data
Date Sat, 13 Apr 2019 17:35:26 GMT
I kinda agree it is confusing when a parameter is not used...

________________________________
From: Ryan Blue <rblue@netflix.com.INVALID>
Sent: Thursday, April 11, 2019 11:07:25 AM
To: Bruce Robbins
Cc: Dávid Szakállas; Spark Dev List
Subject: Re: Dataset schema incompatibility bug when reading column partitioned data


I think the confusion is that the schema passed to spark.read is not a projection schema.
I don’t think it is even used in this case because the Parquet dataset has its own schema.
You’re getting the schema of the table. I think the correct behavior is to reject a user-specified
schema in this case.

On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins <bersprockets@gmail.com<mailto:bersprockets@gmail.com>>
wrote:
I see a Jira:

https://issues.apache.org/jira/browse/SPARK-21021

On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas <david.szakallas@gmail.com<mailto:david.szakallas@gmail.com>>
wrote:
+dev for more visibility. Is this a known issue? Is there a plan for a fix?

Thanks,
David

Begin forwarded message:

From: Dávid Szakállas <david.szakallas@gmail.com<mailto:david.szakallas@gmail.com>>
Subject: Dataset schema incompatibility bug when reading column partitioned data
Date: 2019. March 29. 14:15:27 CET
To: user@spark.apache.org<mailto:user@spark.apache.org>

We observed the following bug on Spark 2.4.0:


scala> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")

scala> val schema = StructType(Seq(StructField("_1", IntegerType),StructField("_2", IntegerType)))

scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
+---+---+
| _2| _1|
+---+---+
|  2|  1|
+---+- --+

That is, when reading column partitioned Parquet files the explicitly specified schema is
not adhered to, instead the partitioning columns are appended the end of the column list.
This is a quite severe issue as some operations, such as union, fails if columns are in a
different order in two datasets. Thus we have to work around the issue with a select:

val columnNames = schema.fields.map(_.name)
ds.select(columnNames.head, columnNames.tail: _*)


Thanks,
David Szakallas
Data Engineer | Whitepages, Inc.



--
Ryan Blue
Software Engineer
Netflix

Mime
View raw message