spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Darren Govoni <dar...@ontrenet.com>
Subject Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step
Date Fri, 18 Mar 2016 22:46:35 GMT
<html>
  <head>
    <meta content="text/html; charset=utf-8" http-equiv="Content-Type">
  </head>
  <body text="#000000" bgcolor="#FFFFFF">
    <div class="moz-cite-prefix">I wonder if this problem is the same
      one causing some of us the issue with our jobs hanging
      indefinitely on the last task/stage...<br>
      <br>
      hmmmm<br>
      <br>
      On 03/18/2016 04:57 PM, Ted Yu wrote:<br>
    </div>
    <blockquote
cite="mid:CALte62yoWOrJH96G5a44VhcSiApF6mYOdZP5rT=dqODoBqb-3A@mail.gmail.com"
      type="cite">
      <div dir="ltr">bq. <span style="font-size:12.8px">so I have to
          disable tungsten</span>
        <div><span style="font-size:12.8px"><br>
          </span></div>
        <div><span style="font-size:12.8px">Can you try 1.6.1 release so
            that you don't have to disable tungsten ?</span></div>
        <div><span style="font-size:12.8px"><br>
          </span></div>
        <div><span style="font-size:12.8px">Thanks</span></div>
      </div>
      <div class="gmail_extra"><br>
        <div class="gmail_quote">On Fri, Mar 18, 2016 at 1:54 PM, Yong
          Zhang <span dir="ltr">&lt;<a moz-do-not-send="true"
              href="mailto:java8964@hotmail.com" target="_blank">java8964@hotmail.com</a>&gt;</span>
          wrote:<br>
          <blockquote class="gmail_quote" style="margin:0 0 0
            .8ex;border-left:1px #ccc solid;padding-left:1ex">
            <div>
              <div dir="ltr">
                <div dir="ltr">Hi, Sparkers:
                  <div><br>
                  </div>
                  <div>I have some questions related to generate the
                    parquet output in Spark 1.5.2.</div>
                  <div><br>
                  </div>
                  <div>I have 2 data sets to join, and I know one is
                    much smaller than the other one, so I have the
                    following test code:</div>
                  <div><br>
                  </div>
                  <div>val loadRaw = sqlContext.read.parquet("one days
                    of data in parquet format")</div>
                  <div>val historyRaw = sqlContext.read.parquet("90 days
                    of history data in parquet format")</div>
                  <div><br>
                  </div>
                  <div>// the trailRaw will be very small, normally only
                    thousands of row from 20M of one day's data</div>
                  <div>val trialRaw =
                    loadRaw.filter(instr(loadRaw("event_list"), "202")
                    &gt; 0).selectExpr("e1 as account_id", "visid_high",
                    "visid_low", "ip")</div>
                  <div><br>
                  </div>
                  <div>trialRaw.count</div>
                  <div><b>res0: Long = 1494</b></div>
                  <div><br>
                  </div>
                  <div>// so the trailRaw data is small</div>
                  <div><br>
                  </div>
                  <div>val join = historyRaw.join(broadcast(trialRaw),
                    trialRaw("visid_high") === historyRaw("visid_high")
                    &amp;&amp; <span style="font-size:12pt"> </span><span
                      style="font-size:12pt">trialRaw("visid_low") ===
                      historyRaw("visid_low</span><span
                      style="font-size:12pt">")</span><span
                      style="font-size:12pt"> &amp;&amp;
                      trialRaw("date_time") &gt;
                      historyRaw("date_time"))</span></div>
                  <div>
                    <pre style="font-family:Menlo;font-size:12pt;background-color:rgb(255,255,255)"><span
style="color:#000080;font-weight:bold">val </span>col_1 = trialRaw(<span style="color:#008000;font-weight:bold">"visid_high"</span>)
<span style="color:#000080;font-weight:bold">val </span>col_2 = trialRaw(<span
style="color:#008000;font-weight:bold">"visid_low"</span>)
<span style="color:#000080;font-weight:bold">val </span>col_3 = trialRaw(<span
style="color:#008000;font-weight:bold">"date_time"</span>)
<span style="color:#000080;font-weight:bold">val </span>col_4 = trialRaw(<span
style="color:#008000;font-weight:bold">"ip"</span>)</pre>
                  </div>
                  <div>// drop the duplicate columns after join</div>
                  <div>val output =
                    join.drop(col1).drop(col2).drop(col3).drop(col4)</div>
                  <div>output.write.parquet("hdfs location")</div>
                  <div><br>
                  </div>
                  <div>First problem, I think I am facing <a
                      moz-do-not-send="true"
                      href="https://issues.apache.org/jira/browse/SPARK-10309"
                      target="_blank">Spark-10309</a></div>
                  <div>
                    <pre>Caused by: java.io.IOException: Unable to acquire 67108864
bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.&lt;init&gt;(UnsafeExternalSorter.java:138)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)</pre>
                  </div>
                  <div><br>
                  </div>
                  <div>so I have to disable tungsten (<span
style="font-family:Menlo;font-size:12pt;background-color:rgb(255,255,255)">spark.sql.tungsten.enabled=false)</span><span
                      style="font-size:12pt">, </span></div>
                  <div><span style="font-size:12pt"><br>
                    </span></div>
                  <div>Now the problem is the Spark finishes this job
                    very slow, even worse than same logic done in  Hive.</div>
                  <div>The explain shows the broadcast join is used:</div>
                  <div>join.explain(true)</div>
                  <div><br>
                  </div>
                  <div>.....</div>
                  <div>
                    <div>== Physical Plan ==</div>
                    <div>Filter (date_time#25L &gt; date_time#519L)</div>
                    <div> <b>BroadcastHashJoin</b>
                      [visid_high#954L,visid_low#955L],
                      [visid_high#460L,visid_low#461L], BuildRight</div>
                    <div>  ConvertToUnsafe</div>
                    <div>   Scan ParquetRelation[hdfs://xxxxxx][400+
                      columns shown up here]</div>
                    <div>  ConvertToUnsafe</div>
                    <div>   Project [soid_e1#30 AS
                      account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127]</div>
                    <div>    Filter (instr(event_list#105,202) &gt; 0)</div>
                    <div>     Scan
ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]</div>
                    <div>Code Generation: true</div>
                  </div>
                  <div><br>
                  </div>
                  <div> I don't understand the statistics shown in the
                    GUI below:</div>
                  <div>
                    <div style="display:inline-block"> <span> </span>
                      <div style="display:inline-block"><img
                          src="cid:part3.08000707.00000905@ontrenet.com"
                          height="311" width="1244"></div>
                      <span> </span></div>
                    <br>
                  </div>
                  <div>It looks like the last task will shuffle read all
                    506.6G data, but this DOESN'T make any sense. The
                    final output of 200 files shown below:</div>
                  <div><br>
                  </div>
                  <div><i>hadoop fs -ls hdfs://finalPath | sort -u -k5n</i></div>
                  <div>
                    <div><i>Found 203 items</i></div>
                    <div><i>-rw-r--r--   3 biginetl biginetl      44237
                        2016-03-18 16:47 finalPath/_common_metadata</i></div>
                    <div><i>-rw-r--r--   3 biginetl biginetl     105534
                        2016-03-18
15:45 finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet</i></div>
                    <div><i>-rw-r--r--   3 biginetl biginetl     107098
                        2016-03-18
16:24 finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet</i></div>
                  </div>
                  <div><i>.............</i></div>
                  <div>
                    <div><i>-rw-r--r--   3 biginetl biginetl    1031400
                        2016-03-18
16:35 finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet</i></div>
                    <div><i>-rw-r--r--   3 biginetl biginetl    1173678
                        2016-03-18
16:21 finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet</i></div>
                    <div><i>-rw-r--r--   3 biginetl biginetl   12257423
                        2016-03-18 16:47 finalPath/_metadata</i></div>
                  </div>
                  <div><br>
                  </div>
                  <div>As we can see, the largest file is only 1.1M, so
                    the total output is just about 150M for all 200
                    files.</div>
                  <div><span style="font-size:12pt">I really don't
                      understand why stage 5 is so slow, and why the
                      shuffle read is so BIG. </span></div>
                  <div>Understanding the "broadcast" join in Spark 1.5
                    is very important for our use case, Please tell me
                    what could the reasons behind this.</div>
                  <div><br>
                  </div>
                  <div>Thanks</div>
                  <span class="HOEnZb"><font color="#888888">
                      <div><br>
                      </div>
                      <div>Yong</div>
                      <div><br>
                      </div>
                    </font></span></div>
              </div>
            </div>
          </blockquote>
        </div>
        <br>
      </div>
    </blockquote>
    <br>
  </body>
</html>

Mime
View raw message