spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Darren Govoni <dar...@ontrenet.com>
Subject RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
Date Tue, 22 Mar 2016 17:21:13 GMT

    
I hope you get an answer to this.  But are you able to try a newer version?


Sent from my Verizon Wireless 4G LTE smartphone

-------- Original message --------
From: Yong Zhang <java8964@hotmail.com> 
Date: 03/22/2016  1:08 PM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step 

Please help me understand how the "broadcast" will work on DF in Spark 1.5.2.
Below are the 2 joins I tested and the physical plan I dumped:
val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === historyRaw("visid_high")
&&  trialRaw("visid_low") === historyRaw("visid_low") && trialRaw("date_time")
> historyRaw("date_time"))val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
=== historyRaw("visid_high") &&  trialRaw("visid_low") === historyRaw("visid_low") &&
trialRaw("date_time") > historyRaw("date_time"))
join1.explain(true)== Physical Plan ==Filter (date_time#25L > date_time#513L) SortMergeJoin
[visid_high#948L,visid_low#949L], [visid_high#460L,visid_low#461L]  ExternalSort [visid_high#948L
ASC,visid_low#949L ASC], false   Exchange hashpartitioning(visid_high#948L,visid_low#949L) 
  Scan ParquetRelation[hdfs://xxxxxxxx]  ExternalSort [visid_high#460L ASC,visid_low#461L
ASC], false   Exchange hashpartitioning(visid_high#460L,visid_low#461L)    Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L] 
   Filter (instr(event_list#105,202) > 0)      Scan ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
join2.explain(true)== Physical Plan ==Filter (date_time#25L > date_time#513L) BroadcastHashJoin
[visid_high#948L,visid_low#949L], [visid_high#460L,visid_low#461L], BuildRight    Scan ParquetRelation[hdfs://xxxxxxxx]Project
[soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]   Filter (instr(event_list#105,202)
> 0)    Scan ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
Obvious, the explain plans are different, but the performance and the job execution steps
are almost exactly same, as shown in the original picture in the email below.Keep in mind
that I have to run with "--conf spark.sql.tungsten.enabled=false", otherwise, the execution
plan will do the tungsten sort.
Now what confusing me is following:When using the broadcast join, the job still generates
3 stages, same as SortMergeJoin, but I am not sure this makes sense. Ideally, in "Broadcast",
the first stage scan the "trialRaw" data, using the filter (instr(event_list#105,202) >
0), which BTW will filter out 99% of data, then "broadcasting" remaining data to all the nodes.
Then scan "historyRaw", while filtering by join with broadcasted data. In the end, we can
say there is one more stage to save the data in the default "200" partitions. So there should
be ONLY 2 stages enough for this case. Why there are still 3 stages in this case, just same
as "SortMergeJoin", it looks like "broadcast" not taking effect at all? But the physical plan
clearly shows that "Broadcast" hint?
Thanks
Yong

From: java8964@hotmail.com
To: user@spark.apache.org
Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
Date: Fri, 18 Mar 2016 16:54:16 -0400






<!--
.ExternalClass .ecxhmmessage P {
padding:0px;
}

.ExternalClass body.ecxhmmessage {
font-size:12pt;
font-family:Calibri;
}


-->
Hi, Sparkers:
I have some questions related to generate the parquet output in Spark 1.5.2.
I have 2 data sets to join, and I know one is much smaller than the other one, so I have the
following test code:
val loadRaw = sqlContext.read.parquet("one days of data in parquet format")val historyRaw
= sqlContext.read.parquet("90 days of history data in parquet format")
// the trailRaw will be very small, normally only thousands of row from 20M of one day's dataval
trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") > 0).selectExpr("e1 as account_id",
"visid_high", "visid_low", "ip")
trialRaw.countres0: Long = 1494
// so the trailRaw data is small
val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === historyRaw("visid_high")
&&  trialRaw("visid_low") === historyRaw("visid_low") && trialRaw("date_time")
> historyRaw("date_time"))val col_1 = trialRaw("visid_high")
val col_2 = trialRaw("visid_low")
val col_3 = trialRaw("date_time")
val col_4 = trialRaw("ip")// drop the duplicate columns after joinval output = join.drop(col1).drop(col2).drop(col3).drop(col4)output.write.parquet("hdfs
location")
First problem, I think I am facing Spark-10309Caused by: java.io.IOException: Unable to acquire
67108864 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
so I have to disable tungsten (spark.sql.tungsten.enabled=false), 
Now the problem is the Spark finishes this job very slow, even worse than same logic done
in  Hive.The explain shows the broadcast join is used:join.explain(true)
.....== Physical Plan ==Filter (date_time#25L > date_time#519L) BroadcastHashJoin [visid_high#954L,visid_low#955L],
[visid_high#460L,visid_low#461L], BuildRight  ConvertToUnsafe   Scan ParquetRelation[hdfs://xxxxxx][400+
columns shown up here]  ConvertToUnsafe   Project [soid_e1#30 AS account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127] 
  Filter (instr(event_list#105,202) > 0)     Scan ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]Code
Generation: true
 I don't understand the statistics shown in the GUI below:              
It looks like the last task will shuffle read all 506.6G data, but this DOESN'T make any sense.
The final output of 200 files shown below:
hadoop fs -ls hdfs://finalPath | sort -u -k5nFound 203 items-rw-r--r--   3 biginetl biginetl
     44237 2016-03-18 16:47 finalPath/_common_metadata-rw-r--r--   3 biginetl biginetl
    105534 2016-03-18 15:45 finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet-rw-r--r--
  3 biginetl biginetl     107098 2016-03-18 16:24 finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet.............-rw-r--r--
  3 biginetl biginetl    1031400 2016-03-18 16:35 finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet-rw-r--r--
  3 biginetl biginetl    1173678 2016-03-18 16:21 finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet-rw-r--r--
  3 biginetl biginetl   12257423 2016-03-18 16:47 finalPath/_metadata
As we can see, the largest file is only 1.1M, so the total output is just about 150M for all
200 files.I really don't understand why stage 5 is so slow, and why the shuffle read is so
BIG. Understanding the "broadcast" join in Spark 1.5 is very important for our use case,
Please tell me what could the reasons behind this.
Thanks
Yong

 		 	   		   		 	   		  <!--
.ExternalClass .ecxhmmessage P {
padding:0px;
}

.ExternalClass body.ecxhmmessage {
font-size:12pt;
font-family:Calibri;
}

--> 		 	   		  
Mime
View raw message