spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Silvio Fiorito <silvio.fior...@granturing.com>
Subject Re: Physical plan for windows and joins - how to know which is faster?
Date Wed, 09 Nov 2016 15:57:44 GMT
Hi Jacek,


I haven't played with 2.1.0 yet, so not sure how much more optimized Window functions are
compared to 1.6 and 2.0.


However, one thing I do see in the self-join is a broadcast. So there's going to be a need
broadcast the results of the groupBy out to the executors before it can do the join. In both
cases it's shuffling the data (for the groupBy or the Window).


Have you tried running both queries to see? Would be interesting to test it on varying data
volumes as well (e.g. what if there's no broadcast).


Thanks,

Silvio

________________________________
From: Jacek Laskowski <jacek@japila.pl>
Sent: Wednesday, November 9, 2016 7:36:47 AM
To: user
Subject: Physical plan for windows and joins - how to know which is faster?

Hi,

While playing around with Spark 2.1.0-SNAPSHOT (built today) and
explain'ing two queries with WindowSpec and inner join I found the
following plans and am wondering if you could help me to judge which
query could be faster.

What else would you ask for to be able to answer the question of one
being more efficient than the other?

Just by looking at the Spark's "stack traces" of the queries one could
say that windowed variant (first one) is gonna be faster (as there are
less physical operators) yet top-level Window operator is not
codegened so it might be misleading.

I'd appreciate your help to get me better at reading such trees. Thanks!

scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain
== Physical Plan ==
Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS
sum(id)#665L], [ID % 3#60]
+- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ID % 3#60, 200)
      +- LocalTableScan [id#15, multiplied#16, ID % 3#60]

scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain
== Physical Plan ==
*Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L]
+- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight
   :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS
ID % 3#60]
   :  +- *Filter isnotnull((_1#12 % 3))
   :     +- LocalTableScan [_1#12, _2#13]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, true] as bigint)))
      +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15
as bigint))])
         +- Exchange hashpartitioning(ID % 3#681, 200)
            +- *HashAggregate(keys=[ID % 3#681],
functions=[partial_sum(cast(id#15 as bigint))])
               +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681]
                  +- *Filter isnotnull((_1#12 % 3))
                     +- LocalTableScan [_1#12, _2#13]

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message