spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eyal Farago (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)
Date Wed, 22 Aug 2018 20:55:00 GMT
Eyal Farago created SPARK-25203:
-----------------------------------

             Summary: spark sql, union all does not propagate child partitioning (when possible)
                 Key: SPARK-25203
                 URL: https://issues.apache.org/jira/browse/SPARK-25203
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, SQL
    Affects Versions: 2.3.0, 2.2.0, 2.4.0
            Reporter: Eyal Farago


in spark-sql, union all does not propagate partitioning when all child plans have the same
partitioning, this causes introduction of non necessary Exchange nodes when parent operator
requires a distribution satisfied by this partitioning.

 
{code:java}
CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2);
CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by c1;
CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 distribute by c11;

create or REPLACE TEMPORARY VIEW t1DU as
select * from t1D1
UNION ALL
select * from t1D2;

EXPLAIN select * from t1DU distribute by c1;

== Physical Plan ==
Exchange hashpartitioning(c1#x, 200)
+- Union
   :- Exchange hashpartitioning(c1#x, 200)
   :  +- LocalTableScan [c1#x, c2#x]
   +- Exchange hashpartitioning(c11#x, 200)
      +- LocalTableScan [c11#x, c2#x]
{code}

the Exchange introduced in the last query is unnecessary since the unioned data is already
partitioned by column _c1_, in fact the equivalent RDD operation identifies this scenario
and introduces a PartitionerAwareUnionRDD which maintains children's shared partitioner.

I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by overriding _outputPartitioning_
in a way that identifies common partitioning among child plans and use that (falling back
to default implementation otherwise).

furthermore, it seems current implementation does not properly clusters data:

{code:java}
select *, spark_partition_id() as P  from t1DU distribute by c1
-- !query 15 schema
struct<c1:int,c2:string,P:int>
-- !query 15 output
1	a	43
2	a	374
2	b	174
3	b	251
{code}

notice _c1=2_ in partitions 174 and 374.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message