spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng Lian (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-16344) Array of struct with a single field name "element" can't be decoded from Parquet files written by Spark 1.6+
Date Sun, 10 Jul 2016 08:07:10 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369481#comment-15369481
] 

Cheng Lian commented on SPARK-16344:
------------------------------------

Thanks to [~rdblue]'s comment about why there're two different {{isElementType}} methods parquet-avro,
I finally realized that Spark SQL doesn't really need two {{isElementType}} methods as what
parquet-avro does, and came up with a proper fix of this issue for Spark SQL (already pushed
this to [PR #14014|https://github.com/apache/spark/pull/14014]).

Here I'm trying to write down my understanding for future reference.

One important difference between parquet-avro and Spark SQL 1.6+ is how the requested Parquet
schema is set in {{ReadSupport.init()}}.

In parquet-avro, the requested Parquet schema can be set via two methods:

# If no requested Avro schema is specified, the full Parquet schema of the file to be read
is used as requested schema.
# If a requested Avro schema is specified, parquet-avro converts the requested Avro schema
into a Parquet schema using {{AvroSchemaConverter}}, and use the converted Parquet schema
as requested Parquet schema.

The 2nd case is risky because the converted Parquet schema may not conform to the "flavor"
of the actual schema of the physical Parquet file. For example, the file might be created
using parquet-protobuf, and use {{repeated int32 f;}} to represent an integer array, while
parquet-avro uses either a 2-level or a 3-level layout. This inconsistency limits interoperability
of parquet-avro. Although the 2nd {{isElementType}} in {{AvroRecordConverter}} helps to reconcile
part of the corner cases by comparing the requested schema and the expected Avro schema, this
issue is still not completely resolved.

In Spark SQL, to provide better interoperability, the requested Parquet schema generated for
each physical file is always tailored from the actual schema of the file to be read. The way
we do the tailoring is like the following:

# The query execution engine always provides a requested schema {{cs}} in the form of a Catalyst
{{StructType}}.
# {{ParquetReadSupport.init()}} calls {{ParquetReadSupport.clipParquetSchema()}} to tailor
the Parquet schema {{ps}} of the physical file using {{cs}}
# All column paths that exist in {{cs}} but missing in {{ps}} are added to {{ps}}
# All column paths that exist in {{ps}} but missing in {{ps}} are removed from {{ps}}
# All backward-compatibility rules are properly considered while adding/removing column paths
to/from {{ps}}

The above work was done in [PR #8509|https://github.com/apache/spark/pull/8509].

In this way, it's guaranteed that the tailored requested schema always fits the file to be
read. Thus we don't really need the 2nd {{isElementType}} in {{ParquetRowConverter}} to do
any further reconcilation. The real cause of this JIRA ticket is that the two {{ParquetRowConverter.isElementType}}
makes a different decision from {{ParquetSchemaConverter.isElementType}}. For the schema in
question

{noformat}
optional group f (LIST) {
  repeated group list {
    optional group element {
      optional int32 element;
    }
  }
}
{noformat}

{{ParquetSchemaConverter.isElementType}} thinks it's a standard 3-level layout, which is correct,
while {{ParquetRowConverter.isElementType}} thinks it's a 2-level layout. Thus the generated
row converter doesn't conform with the requested schema, and caused the problem.

By removing {{ParquetRowConverter.isElementType}}, we can avoid the inconsistent decisions.
When trying to test whether a repeated field within a LIST-annotated field corresponds to
the element type or not in {{ParquetRowConverter}}, we only need to try to convert the repeated
type into a Catalyst type to see whether the converted type matches the actual Catalyst array
element type.

The above fix is also equivalent to the proper fix [~rdblue] mentioned since the schema conversion
process properly recurses the data type sub-tree. The Hive case mentioned above is also fixed
by this approach.

cc @yhuai


> Array of struct with a single field name "element" can't be decoded from Parquet files
written by Spark 1.6+
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16344
>                 URL: https://issues.apache.org/jira/browse/SPARK-16344
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1, 1.6.2, 2.0.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>
> This is a weird corner case. Users may hit this issue if they have a schema that
> # has an array field whose element type is a struct, and
> # the struct has one and only one field, and
> # that field is named as "element".
> The following Spark shell snippet for Spark 1.6 reproduces this bug:
> {code}
> case class A(element: Long)
> case class B(f: Array[A])
> val path = "/tmp/silly.parquet"
> Seq(B(Array(A(42)))).toDF("f0").write.mode("overwrite").parquet(path)
> val df = sqlContext.read.parquet(path)
> df.printSchema()
> // root
> //  |-- f0: array (nullable = true)
> //  |    |-- element: struct (containsNull = true)
> //  |    |    |-- element: long (nullable = true)
> df.show()
> {code}
> Exception thrown:
> {noformat}
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in
file file:/tmp/silly.parquet/part-r-00007-e06db7b0-5181-4a14-9fee-5bb452e883a0.gz.parquet
>         at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>         at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>         at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: Expected instance of group converter but got
"org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter"
>         at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:37)
>         at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:266)
>         at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
>         at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
>         at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
>         at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
>         at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>         at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>         ... 26 more
> {noformat}
> Spark 2.0.0-SNAPSHOT and Spark master also suffer this issue. To reproduce it using these
versions, just replace {{sqlContext}} in the above snippet with {{spark}}.
> The reason behind is related to Parquet backwards-compatibility rules for LIST types
defined in [parquet-format spec|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists].
> The Spark SQL schema shown above
> {noformat}
> root
>  |-- f0: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- element: long (nullable = true)
> {noformat}
> is equivalent to the following SQL type:
> {noformat}
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: BIGINT>
>   >
> >
> {noformat}
> According to the parquet-format spec, the standard layout of a LIST-like structure is
a 3-level layout:
> {noformat}
> <list-repetition> group <name> (LIST) {
>   repeated group list {
>     <element-repetition> <element-type> element;
>   }
> }
> {noformat}
> Thus, the standard representation of the aforementioned SQL type should be:
> {noformat}
> message root {
>   optional group f (LIST) {
>     repeated group list {
>       optional group element {    (1)
>         optional int64 element;   (2)
>       }
>     }
>   }
> }
> {noformat}
> Note that the two "element" fields are different:
> - The {{group}} field "element" at (1) is a "container" of list element type. This is
defined as part of the parquet-format spec.
> - The {{int64}} field "element" at (2) corresponds to the {{element}} field of case class
{{A}} we defined above.
> However, due to historical reasons, various existing systems do not conform to the parquet-format
spec and may write LIST structures in a non-standard layout. For example, parquet-avro and
parquet-thrift use a 2-level layout like
> {noformat}
> // parquet-avro style
> <list-repetition> group <name> (LIST) {
>   repeated <element-type> array;
> }
> // parquet-thrift style
> <list-repetition> group <name> (LIST) {
>   repeated <element-type> <name>_tuple;
> }
> {noformat}
> To keep backwards-compatibility, the parquet-format spec defined a set of [backwards-compatibility
rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules]
to also recognize these patterns.
> Unfortunately, these backwards-compatibility rules makes the Parquet schema we mentioned
above ambiguous:
> {noformat}
> message root {
>   optional group f (LIST) {
>     repeated group list {
>       optional group element {
>         optional int64 element;
>       }
>     }
>   }
> }
> {noformat}
> When interpreted using the standard 3-level layout, it is the expected type:
> {noformat}
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: BIGINT>
>   >
> >
> {noformat}
> When interpreted using the legacy 2-level layout, it is the unexpected type
> {noformat}
> // When interpreted as legacy 2-level layout
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: STRUCT<element: BIGINT>>
>   >
> >
> {noformat}
> This is because the nested struct field name happens to be "element", which is used as
a dedicated name of the element type "container" group in the standard 3-level layout, and
lead to the ambiguity.
> Currently, Spark 1.6.x, 2.0.0-SNAPSHOT, and master chose the 2nd one. We can fix this
issue by giving the standard 3-level layout a higher priority when trying to match schema
patterns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message