spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
Date Wed, 23 Mar 2016 17:14:19 GMT
The broadcast hint does not work as expected in this case, could you
also how the logical plan by 'explain(true)'?

On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8964@hotmail.com> wrote:
>
> So I am testing this code to understand "broadcast" feature of DF on Spark 1.6.1.
> This time I am not disable "tungsten". Everything is default value, except setting memory
and cores of my job on 1.6.1.
>
> I am testing the join2 case
>
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === historyRaw("visid_high")
&&  trialRaw("visid_low") === historyRaw("visid_low") && trialRaw("date_time")
> historyRaw("date_time"))
>
> and here is the DAG visualization in the runtime of my testing job:
>
>
>
>
>
> So now, I don't understand how the "broadcast" works on DateFrame in Spark. I originally
thought it will be the same as "mapjoin" in the hive, but can someone explain the DAG above
me?
>
> I have one day data about 1.5G compressed parquet file, filter by "instr(loadRaw("event_list"),
"202") > 0", which will only output about 1494 rows (very small), and it is the "trailRaw"
DF in my example.
> Stage 3 has a filter, which I thought is for the trailRaw data, but the stage statics
doesn't match with the data. I don't know why the input is only 78M, and shuffle write is
about 97.6KB
>
>
>
>
> The historyRaw will be about 90 days history data, which should be about 100G, so it
looks like stage 4 is scanning it
>
>
>
>
> Now, my original thought is that small data will be broadcasted to all the nodes, and
most of history data will be filtered out by the join keys, at least that will be the "mapjoin"
in Hive will do, but from the DAG above, I didn't see it working this way.
> It is more like that Spark use the SortMerge join to shuffle both data across network,
and filter on the "reducers" side by the join keys, to get the final output. But that is not
the "broadcast" join supposed to do, correct?
> In the last stage, it will be very slow, until it reach and process all the history data,
 shown below as "shuffle read" reaching 720G, to finish.
>
>
>
>
> One thing I notice that if tungsten is enable, the shuffle write volume on stage 4 is
larger (720G) than when tungsten is disable (506G) in my originally run, for the exactly same
input. It is an interesting point, does anyone have some idea about this?
>
>
> Overall, for my test case, "broadcast" join is the exactly most optimized way I should
use; but somehow, I cannot make it do the same way as "mapjoin" of Hive, even in Spark 1.6.1.
>
> As I said, this is a just test case. We have some business cases making sense to use
"broadcast" join, but until I understand exactly how to make it work as I expect in Spark,
I don't know what to do.
>
> Yong
>
> ________________________________
> From: java8964@hotmail.com
> To: user@spark.apache.org
> Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
> Date: Tue, 22 Mar 2016 13:08:31 -0400
>
>
> Please help me understand how the "broadcast" will work on DF in Spark 1.5.2.
>
> Below are the 2 joins I tested and the physical plan I dumped:
>
> val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === historyRaw("visid_high")
&&  trialRaw("visid_low") === historyRaw("visid_low") && trialRaw("date_time")
> historyRaw("date_time"))
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === historyRaw("visid_high")
&&  trialRaw("visid_low") === historyRaw("visid_low") && trialRaw("date_time")
> historyRaw("date_time"))
>
> join1.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  SortMergeJoin [visid_high#948L,visid_low#949L], [visid_high#460L,visid_low#461L]
>   ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
>    Exchange hashpartitioning(visid_high#948L,visid_low#949L)
>     Scan ParquetRelation[hdfs://xxxxxxxx]
>   ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
>    Exchange hashpartitioning(visid_high#460L,visid_low#461L)
>     Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>      Filter (instr(event_list#105,202) > 0)
>       Scan ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> join2.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  BroadcastHashJoin [visid_high#948L,visid_low#949L], [visid_high#460L,visid_low#461L],
BuildRight
>     Scan ParquetRelation[hdfs://xxxxxxxx]
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>    Filter (instr(event_list#105,202) > 0)
>     Scan ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> Obvious, the explain plans are different, but the performance and the job execution steps
are almost exactly same, as shown in the original picture in the email below.
> Keep in mind that I have to run with "--conf spark.sql.tungsten.enabled=false", otherwise,
the execution plan will do the tungsten sort.
>
> Now what confusing me is following:
> When using the broadcast join, the job still generates 3 stages, same as SortMergeJoin,
but I am not sure this makes sense.
> Ideally, in "Broadcast", the first stage scan the "trialRaw" data, using the filter (instr(event_list#105,202)
> 0), which BTW will filter out 99% of data, then "broadcasting" remaining data to all
the nodes. Then scan "historyRaw", while filtering by join with broadcasted data. In the end,
we can say there is one more stage to save the data in the default "200" partitions. So there
should be ONLY 2 stages enough for this case. Why there are still 3 stages in this case, just
same as "SortMergeJoin", it looks like "broadcast" not taking effect at all? But the physical
plan clearly shows that "Broadcast" hint?
>
> Thanks
>
> Yong
>
>
> ________________________________
> From: java8964@hotmail.com
> To: user@spark.apache.org
> Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
> Date: Fri, 18 Mar 2016 16:54:16 -0400
>
> Hi, Sparkers:
>
> I have some questions related to generate the parquet output in Spark 1.5.2.
>
> I have 2 data sets to join, and I know one is much smaller than the other one, so I have
the following test code:
>
> val loadRaw = sqlContext.read.parquet("one days of data in parquet format")
> val historyRaw = sqlContext.read.parquet("90 days of history data in parquet format")
>
> // the trailRaw will be very small, normally only thousands of row from 20M of one day's
data
> val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") > 0).selectExpr("e1
as account_id", "visid_high", "visid_low", "ip")
>
> trialRaw.count
> res0: Long = 1494
>
> // so the trailRaw data is small
>
> val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === historyRaw("visid_high")
&&  trialRaw("visid_low") === historyRaw("visid_low") && trialRaw("date_time")
> historyRaw("date_time"))
>
> val col_1 = trialRaw("visid_high")
> val col_2 = trialRaw("visid_low")
> val col_3 = trialRaw("date_time")
> val col_4 = trialRaw("ip")
>
> // drop the duplicate columns after join
> val output = join.drop(col1).drop(col2).drop(col3).drop(col4)
> output.write.parquet("hdfs location")
>
> First problem, I think I am facing Spark-10309
>
> Caused by: java.io.IOException: Unable to acquire 67108864 bytes of memory
> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
>
>
> so I have to disable tungsten (spark.sql.tungsten.enabled=false),
>
> Now the problem is the Spark finishes this job very slow, even worse than same logic
done in  Hive.
> The explain shows the broadcast join is used:
> join.explain(true)
>
> .....
> == Physical Plan ==
> Filter (date_time#25L > date_time#519L)
>  BroadcastHashJoin [visid_high#954L,visid_low#955L], [visid_high#460L,visid_low#461L],
BuildRight
>   ConvertToUnsafe
>    Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here]
>   ConvertToUnsafe
>    Project [soid_e1#30 AS account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>     Filter (instr(event_list#105,202) > 0)
>      Scan ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> Code Generation: true
>
>  I don't understand the statistics shown in the GUI below:
>
>
>
> It looks like the last task will shuffle read all 506.6G data, but this DOESN'T make
any sense. The final output of 200 files shown below:
>
> hadoop fs -ls hdfs://finalPath | sort -u -k5n
> Found 203 items
> -rw-r--r--   3 biginetl biginetl      44237 2016-03-18 16:47 finalPath/_common_metadata
> -rw-r--r--   3 biginetl biginetl     105534 2016-03-18 15:45 finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> -rw-r--r--   3 biginetl biginetl     107098 2016-03-18 16:24 finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> .............
> -rw-r--r--   3 biginetl biginetl    1031400 2016-03-18 16:35 finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> -rw-r--r--   3 biginetl biginetl    1173678 2016-03-18 16:21 finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> -rw-r--r--   3 biginetl biginetl   12257423 2016-03-18 16:47 finalPath/_metadata
>
> As we can see, the largest file is only 1.1M, so the total output is just about 150M
for all 200 files.
> I really don't understand why stage 5 is so slow, and why the shuffle read is so BIG.
> Understanding the "broadcast" join in Spark 1.5 is very important for our use case, Please
tell me what could the reasons behind this.
>
> Thanks
>
> Yong
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message