spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sathi Chowdhury <sath...@yahoo.com.INVALID>
Subject Re: Tune hive query launched thru spark-yarn job.
Date Thu, 05 Sep 2019 14:40:13 GMT
What I can immediately think of is, 
as you are doing IN in the where clause for a series of timestamps, if you can consider breaking
them and for each epoch timestampYou can load your results to an intermediate staging table
and then do a final aggregate from that table keeping the group by same. As it is sum and
can be done in two steps.hth



On Thursday, September 5, 2019, 5:10 AM, Himali Patel <himaliben.patel@thalesgroup.com>
wrote:

 <!--#yiv4167175330 _filtered #yiv4167175330 {font-family:"Cambria Math";panose-1:2 4 5
3 5 4 6 3 2 4;} _filtered #yiv4167175330 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2
4;} _filtered #yiv4167175330 {font-family:Monaco;panose-1:0 0 0 0 0 0 0 0 0 0;}#yiv4167175330
#yiv4167175330 p.yiv4167175330MsoNormal, #yiv4167175330 li.yiv4167175330MsoNormal, #yiv4167175330
div.yiv4167175330MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;font-family:"Calibri",
sans-serif;}#yiv4167175330 a:link, #yiv4167175330 span.yiv4167175330MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv4167175330
a:visited, #yiv4167175330 span.yiv4167175330MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv4167175330
pre {margin:0cm;margin-bottom:.0001pt;font-size:10.0pt;font-family:"Courier New";}#yiv4167175330
span.yiv4167175330EmailStyle17 {font-family:"Calibri", sans-serif;}#yiv4167175330 span.yiv4167175330HTMLPreformattedChar
{font-family:"Courier New";}#yiv4167175330 p.yiv4167175330p1, #yiv4167175330 li.yiv4167175330p1,
#yiv4167175330 div.yiv4167175330p1 {margin:0cm;margin-bottom:.0001pt;font-size:8.5pt;font-family:Monaco;color:#2A00FF;}#yiv4167175330
span.yiv4167175330s1 {color:black;}#yiv4167175330 span.yiv4167175330apple-converted-space
{}#yiv4167175330 .yiv4167175330MsoChpDefault {font-family:"Calibri", sans-serif;} _filtered
#yiv4167175330 {margin:72.0pt 72.0pt 72.0pt 72.0pt;}#yiv4167175330 div.yiv4167175330WordSection1
{}-->
Hello all,
 
  
 
We have one use-case where we are aggregating billion of rows. It does huge shuffle.
 
Example : 
 
As per ‘Job’ tab on yarn UI
 
When Input size is 350 G something,  shuffle size >3 TBs. This increases Non-DFS usage
beyond warning limit and thus affecting entire cluster.
 
  
 
It seems we need to tune our query / resources. Any suggestions ?
 
  
 
1.
 
Our data is high in cardinality :
 
# input rows are ~15 billion
 
# output rows are ~13 billion
 
  
 
2.
 
Spark version is 1.6
 
Hive is 1.1
 
It’s CDH.
 
We query using hive context in spark job. (yarn is resource manager)
 
Hive context has configs as :
 
.setConf("hive.exec.dynamic.partition.mode","nonstrict")
 
.setConf("hive.exec.dynamic.partition","true")
 
.setConf("hive.exec.stagingdir","/tmp/hive/")
 
  
 
3.
 
Our aggregation is done using single query as below :
 
SELECT 
 
<list of 16 dimension columns >,
 
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
 
<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, <custom-aggregate-operation>(c2,
'HEX', 'UNION') AS c2, <custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, <custom-aggregate-operation>(c4,
'HEX', 'UNION') AS c4, <custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5, 
 
<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>
 
FROM <table-name>
 
WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3>
, <Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)
 
GROUP BY <list of 16 dimension columns >.
 
  
 
4.
 
Configs are : 
 spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<> spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000       How to tune this job ?                   
      

 
  
 



Mime
View raw message