spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dongjoon Hyun (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-26352) join reordering should not change the order of output attributes
Date Mon, 02 Mar 2020 08:55:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Dongjoon Hyun updated SPARK-26352:
----------------------------------
    Affects Version/s: 2.1.0
                       2.2.0

> join reordering should not change the order of output attributes
> ----------------------------------------------------------------
>
>                 Key: SPARK-26352
>                 URL: https://issues.apache.org/jira/browse/SPARK-26352
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0
>            Reporter: Kris Mok
>            Assignee: Kris Mok
>            Priority: Major
>              Labels: correctness
>             Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} performs join
reordering on inner joins. This was introduced from SPARK-12032 in 2015-12.
> After it had reordered the joins, though, it didn't check whether or not the column order
(in terms of the {{output}} attribute list) is still the same as before. Thus, it's possible
to have a mismatch between the reordered column order vs the schema that a DataFrame thinks
it has.
> This can be demonstrated with the example:
> {code:none}
> spark.sql("create table table_a (x int, y int) using parquet")
> spark.sql("create table table_b (i int, j int) using parquet")
> spark.sql("create table table_c (a int, b int) using parquet")
> val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from
df1 join table_c on a = x and b = i")
> {code}
> here's what the DataFrame thinks:
> {code:none}
> scala> df.printSchema
> root
>  |-- x: integer (nullable = true)
>  |-- y: integer (nullable = true)
>  |-- i: integer (nullable = true)
>  |-- j: integer (nullable = true)
>  |-- a: integer (nullable = true)
>  |-- b: integer (nullable = true)
> {code}
> here's what the optimized plan thinks, after join reordering:
> {code:none}
> scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}:
${a.dataType.typeName}"))
> |-- x: integer
> |-- y: integer
> |-- a: integer
> |-- b: integer
> |-- i: integer
> |-- j: integer
> {code}
> If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule exclusion feature),
it's back to normal:
> {code:none}
> scala> spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")
> scala> val df = spark.sql("with df1 as (select * from table_a cross join table_b)
select * from df1 join table_c on a = x and b = i")
> df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]
> scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}:
${a.dataType.typeName}"))
> |-- x: integer
> |-- y: integer
> |-- i: integer
> |-- j: integer
> |-- a: integer
> |-- b: integer
> {code}
> Note that this column ordering problem leads to data corruption, and can manifest itself
in various symptoms:
> * Silently corrupting data, if the reordered columns happen to either have matching types
or have sufficiently-compatible types (e.g. all fixed length primitive types are considered
as "sufficiently compatible" in an UnsafeRow), then only the resulting data is going to be
wrong but it might not trigger any alarms immediately. Or
> * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, or even
SIGSEGVs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message