spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chanh Le <giaosu...@gmail.com>
Subject [Spark 2.0] How to optimise the query that do shuffle alot?
Date Thu, 04 Aug 2016 09:23:43 GMT
Hi everyone,

I have a query to get a top user buying from those topic from cookie table.
Query take almost 25 mins to run in cluster with 40 CPUs and 20GB RAM.
I am using Spark 2.0.

SELECT
  t1.gender_id,
  CASE WHEN t1.gender_id = 1
    THEN 'Male'
  WHEN t1.gender_id = 2
    THEN 'Female'
  ELSE 'Unknown' END AS     Gender,
  t1.topic_id,
  t2.topic_name_vn,
  t1.inmarket_id,
  CASE WHEN t3.inmarket_name IS NULL
    THEN 'Unknown'
  ELSE t3.inmarket_name END inmarket_name,
  sum(t1.viewable)          viewable
FROM ad_cookie_report t1 LEFT JOIN topic t2 ON (t1.topic_id = t2.topic_id)
  LEFT JOIN inmarket t3 ON (t1.inmarket_id = t3.inmarket_id)
WHERE time BETWEEN '2016-07-28-00' AND '2016-08-03-00' AND network_id = 10507 AND userid IN
(SELECT DISTINCT user_id
                                                                                         
   FROM fact_stats_conv_daily
                                                                                         
   WHERE
                                                                                         
     time BETWEEN '2016-08-01' AND '2016-08-03'
                                                                                         
     AND network_id = 10507
                                                                                         
     AND inmarket_id IN
                                                                                         
         (20739, 24056, 20715))
GROUP BY gender_id, t1.topic_id, t2.topic_name_vn, t1.inmarket_id, t3.inmarket_name
ORDER BY viewable DESC
LIMIT 100


The detail of this job going divide into 4 stages.




The problem is in last 2 stages only 1 executor is reading the others is not.




Currently I set shuffle partition is 10. Do I need to increase that number?

Parse Plan


== Parsed Logical Plan ==
'GlobalLimit 100
+- 'LocalLimit 100
   +- 'Sort ['viewable DESC], true
      +- 'Aggregate ['gender_id, 't1.topic_id, 't2.topic_name_vn, 't1.inmarket_id, 't3.inmarket_name],
['t1.gender_id, CASE WHEN ('t1.gender_id = 1) THEN Nam WHEN ('t1.gender_id = 2) THEN Nữ
ELSE Unknown END AS Gender#897, 't1.topic_id, 't2.topic_name_vn, 't1.inmarket_id, CASE WHEN
isnull('t3.inmarket_name) THEN Unknown ELSE 't3.inmarket_name END AS inmarket_name#898, 'sum('t1.viewable)
AS viewable#899]
         +- 'Filter (((('time >= 2016-07-28-00) && ('time <= 2016-08-03-00))
&& ('network_id = 10507)) && 'userid IN (list#900))
            :  +- 'SubqueryAlias list#900
            :     +- 'Distinct
            :        +- 'Project ['user_id]
            :           +- 'Filter (((('time >= 2016-08-01) && ('time <= 2016-08-03))
&& ('network_id = 10507)) && 'inmarket_id IN (20739,24056,20715))
            :              +- 'UnresolvedRelation `fact_stats_conv_daily`
            +- 'Join LeftOuter, ('t1.inmarket_id = 't3.inmarket_id)
               :- 'Join LeftOuter, ('t1.topic_id = 't2.topic_id)
               :  :- 'UnresolvedRelation `ad_cookie_report`, t1
               :  +- 'UnresolvedRelation `topic`, t2
               +- 'UnresolvedRelation `inmarket`, t3

== Analyzed Logical Plan ==
gender_id: int, Gender: string, topic_id: int, topic_name_vn: string, inmarket_id: int, inmarket_name:
string, viewable: bigint
GlobalLimit 100
+- LocalLimit 100
   +- Sort [viewable#899L DESC], true
      +- Aggregate [gender_id#1323, topic_id#1313, topic_name_vn#1335, inmarket_id#1315, inmarket_name#1341],
[gender_id#1323, CASE WHEN (gender_id#1323 = 1) THEN Nam WHEN (gender_id#1323 = 2) THEN Nữ
ELSE Unknown END AS Gender#897, topic_id#1313, topic_name_vn#1335, inmarket_id#1315, CASE
WHEN isnull(inmarket_name#1341) THEN Unknown ELSE inmarket_name#1341 END AS inmarket_name#898,
sum(cast(viewable#1327 as bigint)) AS viewable#899L]
         +- Filter ((((time#1302 >= 2016-07-28-00) && (time#1302 <= 2016-08-03-00))
&& (network_id#1333 = 10507)) && predicate-subquery#900 [(userid#1325L = user_id#1369L)])
            :  +- SubqueryAlias predicate-subquery#900 [(userid#1325L = user_id#1369L)]
            :     +- Distinct
            :        +- Project [user_id#1369L]
            :           +- Filter ((((time#1346 >= 2016-08-01) && (time#1346 <=
2016-08-03)) && (network_id#1385 = 10507)) && cast(inmarket_id#1349L as bigint)
IN (cast(20739 as bigint),cast(24056 as bigint),cast(20715 as bigint)))
            :              +- SubqueryAlias fact_stats_conv_daily
            :                 +- Relation[time#1346,topic_id#1347L,interest_id#1348L,inmarket_id#1349L,os_id#1350L,browser_id#1351L,device_type#1352L,device_id#1353L,location_id#1354L,age_id#1355L,gender_id#1356L,website_id#1357L,channel_id#1358L,section_id#1359L,zone_id#1360L,placment_id#1361L,advertiser_id#1362L,campaign_id#1363L,payment_id#1364L,creative_id#1365L,audience_id#1366L,merchant_cate#1367,product_id#1368,user_id#1369L,...
16 more fields] parquet
            +- Join LeftOuter, (cast(inmarket_id#1315 as bigint) = inmarket_id#1340L)
               :- Join LeftOuter, (cast(topic_id#1313 as bigint) = topic_id#1334L)
               :  :- SubqueryAlias t1
               :  :  +- Relation[time#1302,advertiser_id#1303,campaign_id#1304,payment_id#1305,creative_id#1306,website_id#1307,channel_id#1308,section_id#1309,zone_id#1310,ad_default#1311,placment_id#1312,topic_id#1313,interest_id#1314,inmarket_id#1315,audience_id#1316,os_id#1317,browser_id#1318,device_type#1319,device_id#1320,location_id#1321,age_id#1322,gender_id#1323,merchant_cate#1324,userid#1325L,...
8 more fields] parquet
               :  +- SubqueryAlias t2
               :     +- Relation[topic_id#1334L,topic_name_vn#1335,topic_name_en#1336,parent_id#1337L,full_parent#1338,level_id#1339L]
parquet
               +- SubqueryAlias t3
                  +- Relation[inmarket_id#1340L,inmarket_name#1341,parent_id#1342L,full_parent#1343,status#1344L,level_id#1345L]
parquet

== Optimized Logical Plan ==
GlobalLimit 100
+- LocalLimit 100
   +- Sort [viewable#899L DESC], true
      +- Aggregate [gender_id#1323, topic_id#1313, topic_name_vn#1335, inmarket_id#1315, inmarket_name#1341],
[gender_id#1323, CASE WHEN (gender_id#1323 = 1) THEN Nam WHEN (gender_id#1323 = 2) THEN Nữ
ELSE Unknown END AS Gender#897, topic_id#1313, topic_name_vn#1335, inmarket_id#1315, CASE
WHEN isnull(inmarket_name#1341) THEN Unknown ELSE inmarket_name#1341 END AS inmarket_name#898,
sum(cast(viewable#1327 as bigint)) AS viewable#899L]
         +- Project [topic_id#1313, inmarket_id#1315, gender_id#1323, viewable#1327, topic_name_vn#1335,
inmarket_name#1341]
            +- Join LeftOuter, (cast(inmarket_id#1315 as bigint) = inmarket_id#1340L)
               :- Project [topic_id#1313, inmarket_id#1315, gender_id#1323, viewable#1327,
topic_name_vn#1335]
               :  +- Join LeftOuter, (cast(topic_id#1313 as bigint) = topic_id#1334L)
               :     :- Project [topic_id#1313, inmarket_id#1315, gender_id#1323, viewable#1327]
               :     :  +- Join LeftSemi, (userid#1325L = user_id#1369L)
               :     :     :- Filter ((((isnotnull(time#1302) && isnotnull(network_id#1333))
&& (time#1302 >= 2016-07-28-00)) && (time#1302 <= 2016-08-03-00)) &&
(network_id#1333 = 10507))
               :     :     :  +- Relation[time#1302,advertiser_id#1303,campaign_id#1304,payment_id#1305,creative_id#1306,website_id#1307,channel_id#1308,section_id#1309,zone_id#1310,ad_default#1311,placment_id#1312,topic_id#1313,interest_id#1314,inmarket_id#1315,audience_id#1316,os_id#1317,browser_id#1318,device_type#1319,device_id#1320,location_id#1321,age_id#1322,gender_id#1323,merchant_cate#1324,userid#1325L,...
8 more fields] parquet
               :     :     +- Aggregate [user_id#1369L], [user_id#1369L]
               :     :        +- Project [user_id#1369L]
               :     :           +- Filter (((((isnotnull(network_id#1385) && isnotnull(time#1346))
&& (time#1346 >= 2016-08-01)) && (time#1346 <= 2016-08-03)) &&
(network_id#1385 = 10507)) && inmarket_id#1349L IN (20739,24056,20715))
               :     :              +- Relation[time#1346,topic_id#1347L,interest_id#1348L,inmarket_id#1349L,os_id#1350L,browser_id#1351L,device_type#1352L,device_id#1353L,location_id#1354L,age_id#1355L,gender_id#1356L,website_id#1357L,channel_id#1358L,section_id#1359L,zone_id#1360L,placment_id#1361L,advertiser_id#1362L,campaign_id#1363L,payment_id#1364L,creative_id#1365L,audience_id#1366L,merchant_cate#1367,product_id#1368,user_id#1369L,...
16 more fields] parquet
               :     +- Project [topic_id#1334L, topic_name_vn#1335]
               :        +- Relation[topic_id#1334L,topic_name_vn#1335,topic_name_en#1336,parent_id#1337L,full_parent#1338,level_id#1339L]
parquet
               +- Project [inmarket_id#1340L, inmarket_name#1341]
                  +- Relation[inmarket_id#1340L,inmarket_name#1341,parent_id#1342L,full_parent#1343,status#1344L,level_id#1345L]
parquet

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[viewable#899L DESC], output=[gender_id#1323,Gender#897,topic_id#1313,topic_name_vn#1335,inmarket_id#1315,inmarket_name#898,viewable#899L])
+- *HashAggregate(keys=[gender_id#1323, topic_id#1313, topic_name_vn#1335, inmarket_id#1315,
inmarket_name#1341], functions=[sum(cast(viewable#1327 as bigint))], output=[gender_id#1323,
Gender#897, topic_id#1313, topic_name_vn#1335, inmarket_id#1315, inmarket_name#898, viewable#899L])
   +- Exchange hashpartitioning(gender_id#1323, topic_id#1313, topic_name_vn#1335, inmarket_id#1315,
inmarket_name#1341, 10)
      +- *HashAggregate(keys=[gender_id#1323, topic_id#1313, topic_name_vn#1335, inmarket_id#1315,
inmarket_name#1341], functions=[partial_sum(cast(viewable#1327 as bigint))], output=[gender_id#1323,
topic_id#1313, topic_name_vn#1335, inmarket_id#1315, inmarket_name#1341, sum#1698L])
         +- *Project [topic_id#1313, inmarket_id#1315, gender_id#1323, viewable#1327, topic_name_vn#1335,
inmarket_name#1341]
            +- *BroadcastHashJoin [cast(inmarket_id#1315 as bigint)], [inmarket_id#1340L],
LeftOuter, BuildRight
               :- *Project [topic_id#1313, inmarket_id#1315, gender_id#1323, viewable#1327,
topic_name_vn#1335]
               :  +- *BroadcastHashJoin [cast(topic_id#1313 as bigint)], [topic_id#1334L],
LeftOuter, BuildRight
               :     :- *Project [topic_id#1313, inmarket_id#1315, gender_id#1323, viewable#1327]
               :     :  +- SortMergeJoin [userid#1325L], [user_id#1369L], LeftSemi
               :     :     :- *Sort [userid#1325L ASC], false, 0
               :     :     :  +- Exchange hashpartitioning(userid#1325L, 10)
               :     :     :     +- *Project [time#1302, advertiser_id#1303, campaign_id#1304,
payment_id#1305, creative_id#1306, website_id#1307, channel_id#1308, section_id#1309, zone_id#1310,
ad_default#1311, placment_id#1312, topic_id#1313, interest_id#1314, inmarket_id#1315, audience_id#1316,
os_id#1317, browser_id#1318, device_type#1319, device_id#1320, location_id#1321, age_id#1322,
gender_id#1323, merchant_cate#1324, userid#1325L, ... 8 more fields]
               :     :     :        +- *BatchedScan parquet default.ad_cookie_report[advertiser_id#1303,campaign_id#1304,payment_id#1305,creative_id#1306,website_id#1307,channel_id#1308,section_id#1309,zone_id#1310,ad_default#1311,placment_id#1312,topic_id#1313,interest_id#1314,inmarket_id#1315,audience_id#1316,os_id#1317,browser_id#1318,device_type#1319,device_id#1320,location_id#1321,age_id#1322,gender_id#1323,merchant_cate#1324,userid#1325L,impression#1326,...
8 more fields] Format: ParquetFormat, InputPaths: alluxio://master2:19998/AD_COOKIE_REPORT,
PushedFilters: [], ReadSchema: struct<advertiser_id:int,campaign_id:int,payment_id:int,creative_id:int,website_id:int,channel_id...
               :     :     +- *Sort [user_id#1369L ASC], false, 0
               :     :        +- *HashAggregate(keys=[user_id#1369L], functions=[], output=[user_id#1369L])
               :     :           +- Exchange hashpartitioning(user_id#1369L, 10)
               :     :              +- *HashAggregate(keys=[user_id#1369L], functions=[],
output=[user_id#1369L])
               :     :                 +- *Project [user_id#1369L]
               :     :                    +- *Filter inmarket_id#1349L IN (20739,24056,20715)
               :     :                       +- *BatchedScan parquet default.fact_stats_conv_daily[inmarket_id#1349L,user_id#1369L,time#1346,network_id#1385]
Format: ParquetFormat, InputPaths: alluxio://master2:19998/FACT_STATS_CONV_DAILY, PushedFilters:
[In(inmarket_id, [20739,24056,20715]], ReadSchema: struct<inmarket_id:bigint,user_id:bigint>
               :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
true]))
               :        +- *BatchedScan parquet default.topic[topic_id#1334L,topic_name_vn#1335]
Format: ParquetFormat, InputPaths: alluxio://master2:19998/etl_info/TOPIC, PushedFilters:
[], ReadSchema: struct<topic_id:bigint,topic_name_vn:string>
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
                  +- *BatchedScan parquet default.inmarket[inmarket_id#1340L,inmarket_name#1341]
Format: ParquetFormat, InputPaths: alluxio://master2:19998/etl_info/INMARKET, PushedFilters:
[], ReadSchema: struct<inmarket_id:bigint,inmarket_name:string>



Regards,
Chanh
Mime
View raw message