spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
Date Wed, 23 Mar 2016 23:20:23 GMT
On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang <java8964@hotmail.com> wrote:
> Here is the output:
>
> == Parsed Logical Plan ==
> Project [400+ columns]
> +- Project [400+ columns]
>    +- Project [400+ columns]
>       +- Project [400+ columns]
>          +- Join Inner, Some((((visid_high#460L = visid_high#948L) &&
> (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
>             :- Relation[400+ columns] ParquetRelation
>             +- BroadcastHint
>                +- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>                   +- Filter (instr(event_list#105,202) > 0)
>                      +- Relation[400+ columns] ParquetRelation
>
> == Analyzed Logical Plan ==
> 400+ columns
> Project [400+ columns]
> +- Project [400+ columns]
>    +- Project [400+ columns]
>       +- Project [400+ columns]
>          +- Join Inner, Some((((visid_high#460L = visid_high#948L) &&
> (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
>             :- Relation[400+ columns] ParquetRelation
>             +- BroadcastHint
>                +- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>                   +- Filter (instr(event_list#105,202) > 0)
>                      +- Relation[400+ columns] ParquetRelation
>
> == Optimized Logical Plan ==
> Project [400+ columns]
> +- Join Inner, Some((((visid_high#460L = visid_high#948L) && (visid_low#461L
> = visid_low#949L)) && (date_time#25L > date_time#513L)))
>    :- Relation[400+ columns] ParquetRelation
>    +- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
>       +- BroadcastHint
>          +- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>             +- Filter (instr(event_list#105,202) > 0)
>                +- Relation[400+ columns] ParquetRelation

There is a Project on top of BroadcastHint, which is inserted by
column pruning rule, that make
the SparkStratege can not regonize BroadcastHint anymore, it's fixed
recently in master [1]

https://github.com/apache/spark/pull/11260

Your join should run as expected in master.

> == Physical Plan ==
> Project [400+ columns]
> +- Filter (date_time#25L > date_time#513L)
>    +- SortMergeJoin [visid_high#948L,visid_low#949L],
> [visid_high#460L,visid_low#461L]
>       :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
>       :  +- TungstenExchange
> hashpartitioning(visid_high#948L,visid_low#949L,200), None
>       :     +- Scan ParquetRelation[400+ columns] InputPaths:
> hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11,
> hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14,
> hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17,
> hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,
> hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06,
> hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09,
> hdfs://xxx/2016/03/10, hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12,
> hdfs://xxx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15,
> hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17
>       +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0
>          +- TungstenExchange
> hashpartitioning(visid_high#460L,visid_low#461L,200), None
>             +- Project
> [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
>                +- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>                   +- Filter (instr(event_list#105,202) > 0)
>                      +- Scan
> ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L]
> InputPaths: hdfs://xxx/2016/03/17
>
> This dataset has more than 480 columns in parquet file, so I replaced them
> with "400+ columns", without blow out the email, but I don't think this
> could do anything with "broadcast" problem.
>
> Thanks
>
> Yong
>
>
>> Date: Wed, 23 Mar 2016 10:14:19 -0700
>> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in
>> the last step
>> From: davies@databricks.com
>> To: java8964@hotmail.com
>> CC: user@spark.apache.org
>
>>
>> The broadcast hint does not work as expected in this case, could you
>> also how the logical plan by 'explain(true)'?
>>
>> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8964@hotmail.com> wrote:
>> >
>> > So I am testing this code to understand "broadcast" feature of DF on
>> > Spark 1.6.1.
>> > This time I am not disable "tungsten". Everything is default value,
>> > except setting memory and cores of my job on 1.6.1.
>> >
>> > I am testing the join2 case
>> >
>> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
>> > === historyRaw("visid_high") && trialRaw("visid_low") ===
>> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>> >
>> > and here is the DAG visualization in the runtime of my testing job:
>> >
>> >
>> >
>> >
>> >
>> > So now, I don't understand how the "broadcast" works on DateFrame in
>> > Spark. I originally thought it will be the same as "mapjoin" in the hive,
>> > but can someone explain the DAG above me?
>> >
>> > I have one day data about 1.5G compressed parquet file, filter by
>> > "instr(loadRaw("event_list"), "202") > 0", which will only output about 1494
>> > rows (very small), and it is the "trailRaw" DF in my example.
>> > Stage 3 has a filter, which I thought is for the trailRaw data, but the
>> > stage statics doesn't match with the data. I don't know why the input is
>> > only 78M, and shuffle write is about 97.6KB
>> >
>> >
>> >
>> >
>> > The historyRaw will be about 90 days history data, which should be about
>> > 100G, so it looks like stage 4 is scanning it
>> >
>> >
>> >
>> >
>> > Now, my original thought is that small data will be broadcasted to all
>> > the nodes, and most of history data will be filtered out by the join keys,
>> > at least that will be the "mapjoin" in Hive will do, but from the DAG above,
>> > I didn't see it working this way.
>> > It is more like that Spark use the SortMerge join to shuffle both data
>> > across network, and filter on the "reducers" side by the join keys, to get
>> > the final output. But that is not the "broadcast" join supposed to do,
>> > correct?
>> > In the last stage, it will be very slow, until it reach and process all
>> > the history data, shown below as "shuffle read" reaching 720G, to finish.
>> >
>> >
>> >
>> >
>> > One thing I notice that if tungsten is enable, the shuffle write volume
>> > on stage 4 is larger (720G) than when tungsten is disable (506G) in my
>> > originally run, for the exactly same input. It is an interesting point, does
>> > anyone have some idea about this?
>> >
>> >
>> > Overall, for my test case, "broadcast" join is the exactly most
>> > optimized way I should use; but somehow, I cannot make it do the same way as
>> > "mapjoin" of Hive, even in Spark 1.6.1.
>> >
>> > As I said, this is a just test case. We have some business cases making
>> > sense to use "broadcast" join, but until I understand exactly how to make it
>> > work as I expect in Spark, I don't know what to do.
>> >
>> > Yong
>> >
>> > ________________________________
>> > From: java8964@hotmail.com
>> > To: user@spark.apache.org
>> > Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in
>> > the last step
>> > Date: Tue, 22 Mar 2016 13:08:31 -0400
>> >
>> >
>> > Please help me understand how the "broadcast" will work on DF in Spark
>> > 1.5.2.
>> >
>> > Below are the 2 joins I tested and the physical plan I dumped:
>> >
>> > val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") ===
>> > historyRaw("visid_high") && trialRaw("visid_low") ===
>> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
>> > === historyRaw("visid_high") && trialRaw("visid_low") ===
>> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>> >
>> > join1.explain(true)
>> > == Physical Plan ==
>> > Filter (date_time#25L > date_time#513L)
>> > SortMergeJoin [visid_high#948L,visid_low#949L],
>> > [visid_high#460L,visid_low#461L]
>> > ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
>> > Exchange hashpartitioning(visid_high#948L,visid_low#949L)
>> > Scan ParquetRelation[hdfs://xxxxxxxx]
>> > ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
>> > Exchange hashpartitioning(visid_high#460L,visid_low#461L)
>> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>> > Filter (instr(event_list#105,202) > 0)
>> > Scan
>> > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>> >
>> > join2.explain(true)
>> > == Physical Plan ==
>> > Filter (date_time#25L > date_time#513L)
>> > BroadcastHashJoin [visid_high#948L,visid_low#949L],
>> > [visid_high#460L,visid_low#461L], BuildRight
>> > Scan ParquetRelation[hdfs://xxxxxxxx]
>> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>> > Filter (instr(event_list#105,202) > 0)
>> > Scan
>> > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>> >
>> > Obvious, the explain plans are different, but the performance and the
>> > job execution steps are almost exactly same, as shown in the original
>> > picture in the email below.
>> > Keep in mind that I have to run with "--conf
>> > spark.sql.tungsten.enabled=false", otherwise, the execution plan will do the
>> > tungsten sort.
>> >
>> > Now what confusing me is following:
>> > When using the broadcast join, the job still generates 3 stages, same as
>> > SortMergeJoin, but I am not sure this makes sense.
>> > Ideally, in "Broadcast", the first stage scan the "trialRaw" data, using
>> > the filter (instr(event_list#105,202) > 0), which BTW will filter out 99%
of
>> > data, then "broadcasting" remaining data to all the nodes. Then scan
>> > "historyRaw", while filtering by join with broadcasted data. In the end, we
>> > can say there is one more stage to save the data in the default "200"
>> > partitions. So there should be ONLY 2 stages enough for this case. Why there
>> > are still 3 stages in this case, just same as "SortMergeJoin", it looks like
>> > "broadcast" not taking effect at all? But the physical plan clearly shows
>> > that "Broadcast" hint?
>> >
>> > Thanks
>> >
>> > Yong
>> >
>> >
>> > ________________________________
>> > From: java8964@hotmail.com
>> > To: user@spark.apache.org
>> > Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the
>> > last step
>> > Date: Fri, 18 Mar 2016 16:54:16 -0400
>> >
>> > Hi, Sparkers:
>> >
>> > I have some questions related to generate the parquet output in Spark
>> > 1.5.2.
>> >
>> > I have 2 data sets to join, and I know one is much smaller than the
>> > other one, so I have the following test code:
>> >
>> > val loadRaw = sqlContext.read.parquet("one days of data in parquet
>> > format")
>> > val historyRaw = sqlContext.read.parquet("90 days of history data in
>> > parquet format")
>> >
>> > // the trailRaw will be very small, normally only thousands of row from
>> > 20M of one day's data
>> > val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") >
>> > 0).selectExpr("e1 as account_id", "visid_high", "visid_low", "ip")
>> >
>> > trialRaw.count
>> > res0: Long = 1494
>> >
>> > // so the trailRaw data is small
>> >
>> > val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
>> > === historyRaw("visid_high") && trialRaw("visid_low") ===
>> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>> >
>> > val col_1 = trialRaw("visid_high")
>> > val col_2 = trialRaw("visid_low")
>> > val col_3 = trialRaw("date_time")
>> > val col_4 = trialRaw("ip")
>> >
>> > // drop the duplicate columns after join
>> > val output = join.drop(col1).drop(col2).drop(col3).drop(col4)
>> > output.write.parquet("hdfs location")
>> >
>> > First problem, I think I am facing Spark-10309
>> >
>> > Caused by: java.io.IOException: Unable to acquire 67108864 bytes of
>> > memory
>> > at
>> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
>> > at
>> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
>> > at
>> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
>> >
>> >
>> > so I have to disable tungsten (spark.sql.tungsten.enabled=false),
>> >
>> > Now the problem is the Spark finishes this job very slow, even worse
>> > than same logic done in Hive.
>> > The explain shows the broadcast join is used:
>> > join.explain(true)
>> >
>> > .....
>> > == Physical Plan ==
>> > Filter (date_time#25L > date_time#519L)
>> > BroadcastHashJoin [visid_high#954L,visid_low#955L],
>> > [visid_high#460L,visid_low#461L], BuildRight
>> > ConvertToUnsafe
>> > Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here]
>> > ConvertToUnsafe
>> > Project [soid_e1#30 AS
>> > account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>> > Filter (instr(event_list#105,202) > 0)
>> > Scan
>> > ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>> > Code Generation: true
>> >
>> > I don't understand the statistics shown in the GUI below:
>> >
>> >
>> >
>> > It looks like the last task will shuffle read all 506.6G data, but this
>> > DOESN'T make any sense. The final output of 200 files shown below:
>> >
>> > hadoop fs -ls hdfs://finalPath | sort -u -k5n
>> > Found 203 items
>> > -rw-r--r-- 3 biginetl biginetl 44237 2016-03-18 16:47
>> > finalPath/_common_metadata
>> > -rw-r--r-- 3 biginetl biginetl 105534 2016-03-18 15:45
>> > finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
>> > -rw-r--r-- 3 biginetl biginetl 107098 2016-03-18 16:24
>> > finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
>> > .............
>> > -rw-r--r-- 3 biginetl biginetl 1031400 2016-03-18 16:35
>> > finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
>> > -rw-r--r-- 3 biginetl biginetl 1173678 2016-03-18 16:21
>> > finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
>> > -rw-r--r-- 3 biginetl biginetl 12257423 2016-03-18 16:47
>> > finalPath/_metadata
>> >
>> > As we can see, the largest file is only 1.1M, so the total output is
>> > just about 150M for all 200 files.
>> > I really don't understand why stage 5 is so slow, and why the shuffle
>> > read is so BIG.
>> > Understanding the "broadcast" join in Spark 1.5 is very important for
>> > our use case, Please tell me what could the reasons behind this.
>> >
>> > Thanks
>> >
>> > Yong
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message