spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Buck <Mike.B...@pb.com>
Subject Question about Spark, Inner Join and Delegation to a Parquet Table
Date Mon, 02 Jul 2018 13:23:22 GMT
I have a question about Spark and how it delegates filters to a Parquet-based table. I have
two tables in Hive in Parquet format. Table1 has with four columns of type double and table2
has two columns of type double. I am doing an INNER JOIN of the following:

SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN table1.xmin AND table1.xmax
AND table2.y BETWEEN table1.ymin AND table1.ymax

I noticed that the execution plan as reported by Spark is only delegating the IsNull filter
to the tables and not any other filters:

== Physical Plan ==
*Project [name#0]
+- BroadcastNestedLoopJoin BuildRight, Inner, ((((x#36 >= xmin#13) && (x#36 <=
xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16))
   :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16]
   :  +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && isnotnull(ymax#16))
&& isnotnull(xmax#15))
   :     +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[hdfs://xxxx.xxxx.xxxx.xxxx:8020/apps/hive/warehouse/table1,
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), IsNotNull(ymax), IsNotNull(xmax)],
ReadSchema: struct<name:string,xmin:double,ymin:double,xmax:double,ymax:double>
   +- BroadcastExchange IdentityBroadcastMode
      +- *Project [x#36, y#37]
         +- *Filter (isnotnull(y#37) && isnotnull(x#36))
            +- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://xxxx.xxxx.xxxx.xxxx:8020/apps/hive/warehouse/table2],
PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: struct<x:double,y:double>

However, when doing a filter against a single table the filter is delegated to the table:

SELECT name FROM table1 where table1.xmin > -73.4454183678

== Physical Plan ==
CollectLimit 21
+- *Project [pbkey#150]
   +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678))
      +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[hdfs://xxxx.xxxx.xxxx.xxxx:8020/apps/hive/warehouse/table1, PartitionFilters:
[], PushedFilters: [IsNotNull(xmin), GreaterThan(xmin,-73.4454183678)], ReadSchema: struct<pbkey:string,xmin:double>

So the question is: does Spark delegate filters in a join condition to a Parquet table or
is this an error in the "explain plan" output?

Thanks.


Mime
View raw message