Hi Jacek,


I haven't played with 2.1.0 yet, so not sure how much more optimized Window functions are compared to 1.6 and 2.0.


However, one thing I do see in the self-join is a broadcast. So there's going to be a need broadcast the results of the groupBy out to the executors before it can do the join. In both cases it's shuffling the data (for the groupBy or the Window).


Have you tried running both queries to see? Would be interesting to test it on varying data volumes as well (e.g. what if there's no broadcast).


Thanks,

Silvio


From: Jacek Laskowski <jacek@japila.pl>
Sent: Wednesday, November 9, 2016 7:36:47 AM
To: user
Subject: Physical plan for windows and joins - how to know which is faster?
 
Hi,

While playing around with Spark 2.1.0-SNAPSHOT (built today) and
explain'ing two queries with WindowSpec and inner join I found the
following plans and am wondering if you could help me to judge which
query could be faster.

What else would you ask for to be able to answer the question of one
being more efficient than the other?

Just by looking at the Spark's "stack traces" of the queries one could
say that windowed variant (first one) is gonna be faster (as there are
less physical operators) yet top-level Window operator is not
codegened so it might be misleading.

I'd appreciate your help to get me better at reading such trees. Thanks!

scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain
== Physical Plan ==
Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS
sum(id)#665L], [ID % 3#60]
+- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ID % 3#60, 200)
      +- LocalTableScan [id#15, multiplied#16, ID % 3#60]

scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain
== Physical Plan ==
*Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L]
+- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight
   :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS
ID % 3#60]
   :  +- *Filter isnotnull((_1#12 % 3))
   :     +- LocalTableScan [_1#12, _2#13]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, true] as bigint)))
      +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15
as bigint))])
         +- Exchange hashpartitioning(ID % 3#681, 200)
            +- *HashAggregate(keys=[ID % 3#681],
functions=[partial_sum(cast(id#15 as bigint))])
               +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681]
                  +- *Filter isnotnull((_1#12 % 3))
                     +- LocalTableScan [_1#12, _2#13]

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org