spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Сергей Романов <romano...@inbox.ru.INVALID>
Subject Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.
Date Thu, 01 Sep 2016 15:55:06 GMT
Hi, 

When I run a query like "SELECT field, SUM(x1), SUM(x2)... SUM(x28) FROM parquet_table WHERE
partition = 1 GROUP BY field" it runs in under 2 seconds, but when I add just one more aggregate
field to the query "SELECT field, SUM(x1), SUM(x2)... SUM(x28), SUM(x29) FROM parquet_table
WHERE partition = 1 GROUP BY field" it runs in about 12 seconds. 

Why does it happens? Can I make second query run as fast as first one? I tried browsing logs
in TRACE mode and comparing CODEGEN but everything looks pretty much the same excluding execution
time.

Can this be related to SPARK-17115 ?

I'm using Spark 2.0 Thrift Server over YARN/HDFS with partitioned parquet hive tables. 

Complete example using beeline: 

0: jdbc:hive2://spark-master1.uslicer> DESCRIBE EXTENDED `slicer`.`573_slicer_rnd_13`;

col_name,data_type,comment 
actual_dsp_fee,float,NULL 
actual_pgm_fee,float,NULL 
actual_ssp_fee,float,NULL 
advertiser_id,int,NULL 
advertiser_spent,double,NULL 
anomaly_clicks,bigint,NULL 
anomaly_conversions_filtered,bigint,NULL 
anomaly_conversions_unfiltered,bigint,NULL 
anomaly_decisions,float,NULL 
bid_price,float,NULL 
campaign_id,int,NULL 
click_prob,float,NULL 
clicks,bigint,NULL 
clicks_static,bigint,NULL 
conv_prob,float,NULL 
conversion_id,bigint,NULL 
conversions,bigint,NULL 
creative_id,int,NULL 
dd_convs,bigint,NULL 
decisions,float,NULL 
dmp_liveramp_margin,float,NULL 
dmp_liveramp_payout,float,NULL 
dmp_nielsen_margin,float,NULL 
dmp_nielsen_payout,float,NULL 
dmp_rapleaf_margin,float,NULL 
dmp_rapleaf_payout,float,NULL 
e,float,NULL 
expected_cpa,float,NULL 
expected_cpc,float,NULL 
expected_payout,float,NULL 
first_impressions,bigint,NULL 
fraud_clicks,bigint,NULL 
fraud_impressions,bigint,NULL 
g,float,NULL 
impressions,float,NULL 
line_item_id,int,NULL 
mail_type,string,NULL 
noads,float,NULL 
predict_version,bigint,NULL 
publisher_id,int,NULL 
publisher_revenue,double,NULL 
pvc,bigint,NULL 
second_price,float,NULL 
thirdparty_margin,float,NULL 
thirdparty_payout,float,NULL 
dt,string,NULL 
etl_path,string,NULL 
# Partition Information,, 
# col_name,data_type,comment 
dt,string,NULL 
etl_path,string,NULL 


data_type  CatalogTable( 
        Table: `slicer`.`573_slicer_rnd_13` 
        Owner: spark 
        Created: Fri Aug 12 12:30:20 UTC 2016 
        Last Access: Thu Jan 01 00:00:00 UTC 1970 
        Type: MANAGED 
        Schema: [`actual_dsp_fee` float, `actual_pgm_fee` float, `actual_ssp_fee` float,
`advertiser_id` int, `advertiser_spent` double, `anomaly_clicks` bigint, `anomaly_conversions_filtered`
bigint, `anomaly_conversions_unfiltered` bigint, `anomaly_decisions` float, `bid_price` float,
`campaign_id` int, `click_prob` float, `clicks` bigint, `clicks_static` bigint, `conv_prob`
float, `conversion_id` bigint, `conversions` bigint, `creative_id` int, `dd_convs` bigint,
`decisions` float, `dmp_liveramp_margin` float, `dmp_liveramp_payout` float, `dmp_nielsen_margin`
float, `dmp_nielsen_payout` float, `dmp_rapleaf_margin` float, `dmp_rapleaf_payout` float,
`e` float, `expected_cpa` float, `expected_cpc` float, `expected_payout` float, `first_impressions`
bigint, `fraud_clicks` bigint, `fraud_impressions` bigint, `g` float, `impressions` float,
`line_item_id` int, `mail_type` string, `noads` float, `predict_version` bigint, `publisher_id`
int, `publisher_revenue` double, `pvc` bigint, `second_price` float, `thirdparty_margin` float,
`thirdparty_payout` float, `dt` string, `etl_path` string] 
        Partition Columns: [`dt`, `etl_path`] 
        Properties: [transient_lastDdlTime=1471005020] 
        Storage(Location: hdfs://spark-master1.uslicer.net:8020/user/hive/warehouse/slicer.db/573_slicer_rnd_13,
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
Properties: [serialization.format=1])) 
comment     


0: jdbc:hive2://spark-master1.uslicer> EXPLAIN SELECT `advertiser_id` AS `advertiser_id`,
SUM(`conversions`) AS `conversions`, SUM(`dmp_rapleaf_margin`) AS `dmp_rapleaf_margin`, SUM(`pvc`)
AS `pvc`, SUM(`dmp_nielsen_payout`) AS `dmp_nielsen_payout`,  SUM(`fraud_clicks`) AS `fraud_clicks`,
SUM(`impressions`) AS `impressions`,  SUM(`conv_prob`) AS `conv_prob`, SUM(`dmp_liveramp_payout`)
AS `dmp_liveramp_payout`,  SUM(`decisions`) AS `decisions`,  SUM(`fraud_impressions`) AS
`fraud_impressions`, SUM(`advertiser_spent`) AS `advertiser_spent`, SUM(`actual_ssp_fee`)
AS `actual_ssp_fee`,  SUM(`dmp_nielsen_margin`) AS `dmp_nielsen_margin`, SUM(`first_impressions`)
AS `first_impressions`, SUM(`clicks`) AS `clicks`, SUM(`second_price`) AS `second_price`,
SUM(`click_prob`) AS `click_prob`, SUM(`clicks_static`) AS `clicks_static`, SUM(`expected_payout`)
AS `expected_payout`, SUM(`bid_price`) AS `bid_price`, SUM(`noads`) AS `noads`, SUM(`e`) AS
`e`, SUM(`g`) AS `g`, SUM(`publisher_revenue`) AS `publisher_revenue`, SUM(`dmp_liveramp_margin`)
AS `dmp_liveramp_margin`, SUM(`actual_pgm_fee`) AS `actual_pgm_fee`, SUM(`dmp_rapleaf_payout`)
AS `dmp_rapleaf_payout`, SUM(`dd_convs`) AS `dd_convs`, SUM(`actual_dsp_fee`) AS `actual_dsp_fee`
FROM `slicer`.`573_slicer_rnd_13`  WHERE dt = '2016-07-28'  GROUP BY `advertiser_id`  LIMIT
30; 
plan  == Physical Plan == 
CollectLimit 30 
+- *HashAggregate(keys=[advertiser_id#13866], functions=[sum(conversions#13879L), sum(cast(dmp_rapleaf_margin#13887
as double)), sum(pvc#13904L), sum(cast(dmp_nielsen_payout#13886 as double)), sum(fraud_clicks#13894L),
sum(cast(impressions#13897 as double)), sum(cast(conv_prob#13877 as double)), sum(cast(dmp_liveramp_payout#13884
as double)), sum(cast(decisions#13882 as double)), sum(fraud_impressions#13895L), sum(advertiser_spent#13867),
sum(cast(actual_ssp_fee#13865 as double)), sum(cast(dmp_nielsen_margin#13885 as double)),
sum(first_impressions#13893L), sum(clicks#13875L), sum(cast(second_price#13905 as double)),
sum(cast(click_prob#13874 as double)), sum(clicks_static#13876L), sum(cast(expected_payout#13892
as double)), sum(cast(bid_price#13872 as double)), sum(cast(noads#13900 as double)), sum(cast(e#13889
as double)), sum(cast(g#13896 as double)), sum(publisher_revenue#13903), ... 5 more fields])

   +- Exchange hashpartitioning(advertiser_id#13866, 3) 
      +- *HashAggregate(keys=[advertiser_id#13866], functions=[partial_sum(conversions#13879L),
partial_sum(cast(dmp_rapleaf_margin#13887 as double)), partial_sum(pvc#13904L), partial_sum(cast(dmp_nielsen_payout#13886
as double)), partial_sum(fraud_clicks#13894L), partial_sum(cast(impressions#13897 as double)),
partial_sum(cast(conv_prob#13877 as double)), partial_sum(cast(dmp_liveramp_payout#13884 as
double)), partial_sum(cast(decisions#13882 as double)), partial_sum(fraud_impressions#13895L),
partial_sum(advertiser_spent#13867), partial_sum(cast(actual_ssp_fee#13865 as double)), partial_sum(cast(dmp_nielsen_margin#13885
as double)), partial_sum(first_impressions#13893L), partial_sum(clicks#13875L), partial_sum(cast(second_price#13905
as double)), partial_sum(cast(click_prob#13874 as double)), partial_sum(clicks_static#13876L),
partial_sum(cast(expected_payout#13892 as double)), partial_sum(cast(bid_price#13872 as double)),
partial_sum(cast(noads#13900 as double)), partial_sum(cast(e#13889 as double)), partial_sum(cast(g#13896
as double)), partial_sum(publisher_revenue#13903), ... 5 more fields]) 
         +- *Project [actual_dsp_fee#13863, actual_pgm_fee#13864, actual_ssp_fee#13865,
advertiser_id#13866, advertiser_spent#13867, bid_price#13872, click_prob#13874, clicks#13875L,
clicks_static#13876L, conv_prob#13877, conversions#13879L, dd_convs#13881L, decisions#13882,
dmp_liveramp_margin#13883, dmp_liveramp_payout#13884, dmp_nielsen_margin#13885, dmp_nielsen_payout#13886,
dmp_rapleaf_margin#13887, dmp_rapleaf_payout#13888, e#13889, expected_payout#13892, first_impressions#13893L,
fraud_clicks#13894L, fraud_impressions#13895L, ... 6 more fields] 
            +- *BatchedScan parquet slicer.573_slicer_rnd_13[actual_dsp_fee#13863,actual_pgm_fee#13864,actual_ssp_fee#13865,advertiser_id#13866,advertiser_spent#13867,bid_price#13872,click_prob#13874,clicks#13875L,clicks_static#13876L,conv_prob#13877,conversions#13879L,dd_convs#13881L,decisions#13882,dmp_liveramp_margin#13883,dmp_liveramp_payout#13884,dmp_nielsen_margin#13885,dmp_nielsen_payout#13886,dmp_rapleaf_margin#13887,dmp_rapleaf_payout#13888,e#13889,expected_payout#13892,first_impressions#13893L,fraud_clicks#13894L,fraud_impressions#13895L,...
8 more fields] Format: ParquetFormat, InputPaths: hdfs://spark-master1.uslicer.net:8020/user/hive/warehouse/slicer.db/573_slicer...,
PushedFilters: [], ReadSchema: struct<actual_dsp_fee:float,actual_pgm_fee:float,actual_ssp_fee:float,advertiser_id:int,advertise...



0: jdbc:hive2://spark-master1.uslicer> SELECT `advertiser_id` AS `advertiser_id`, SUM(`conversions`)
AS `conversions`, SUM(`dmp_rapleaf_margin`) AS `dmp_rapleaf_margin`, SUM(`pvc`) AS `pvc`,
SUM(`dmp_nielsen_payout`) AS `dmp_nielsen_payout`,  SUM(`fraud_clicks`) AS `fraud_clicks`,
SUM(`impressions`) AS `impressions`,  SUM(`conv_prob`) AS `conv_prob`, SUM(`dmp_liveramp_payout`)
AS `dmp_liveramp_payout`,  SUM(`decisions`) AS `decisions`,  SUM(`fraud_impressions`) AS
`fraud_impressions`, SUM(`advertiser_spent`) AS `advertiser_spent`, SUM(`actual_ssp_fee`)
AS `actual_ssp_fee`,  SUM(`dmp_nielsen_margin`) AS `dmp_nielsen_margin`, SUM(`first_impressions`)
AS `first_impressions`, SUM(`clicks`) AS `clicks`, SUM(`second_price`) AS `second_price`,
SUM(`click_prob`) AS `click_prob`, SUM(`clicks_static`) AS `clicks_static`, SUM(`expected_payout`)
AS `expected_payout`, SUM(`bid_price`) AS `bid_price`, SUM(`noads`) AS `noads`, SUM(`e`) AS
`e`, SUM(`g`) AS `g`, SUM(`publisher_revenue`) AS `publisher_revenue`, SUM(`dmp_liveramp_margin`)
AS `dmp_liveramp_margin`, SUM(`actual_pgm_fee`) AS `actual_pgm_fee`, SUM(`dmp_rapleaf_payout`)
AS `dmp_rapleaf_payout`, SUM(`dd_convs`) AS `dd_convs`, SUM(`actual_dsp_fee`) AS `actual_dsp_fee`
FROM `slicer`.`573_slicer_rnd_13`  WHERE dt = '2016-07-28'  GROUP BY `advertiser_id`  LIMIT
30; 

(results for three runs) 
30 rows selected (11.904 seconds) 
30 rows selected (11.703 seconds) 
30 rows selected (11.52 seconds) 

XXX 

0: jdbc:hive2://spark-master1.uslicer> EXPLAIN SELECT `advertiser_id` AS `advertiser_id`,
SUM(`conversions`) AS `conversions`, SUM(`dmp_rapleaf_margin`) AS `dmp_rapleaf_margin`, SUM(`pvc`)
AS `pvc`, SUM(`dmp_nielsen_payout`) AS `dmp_nielsen_payout`,  SUM(`fraud_clicks`) AS `fraud_clicks`,
SUM(`impressions`) AS `impressions`,  SUM(`conv_prob`) AS `conv_prob`, SUM(`dmp_liveramp_payout`)
AS `dmp_liveramp_payout`,  SUM(`decisions`) AS `decisions`,  SUM(`fraud_impressions`) AS
`fraud_impressions`, SUM(`advertiser_spent`) AS `advertiser_spent`, SUM(`actual_ssp_fee`)
AS `actual_ssp_fee`,  SUM(`dmp_nielsen_margin`) AS `dmp_nielsen_margin`, SUM(`first_impressions`)
AS `first_impressions`, SUM(`clicks`) AS `clicks`, SUM(`second_price`) AS `second_price`,
SUM(`click_prob`) AS `click_prob`, SUM(`clicks_static`) AS `clicks_static`, SUM(`expected_payout`)
AS `expected_payout`, SUM(`bid_price`) AS `bid_price`, SUM(`noads`) AS `noads`, SUM(`e`) AS
`e`, SUM(`g`) AS `g`, SUM(`publisher_revenue`) AS `publisher_revenue`, SUM(`dmp_liveramp_margin`)
AS `dmp_liveramp_margin`, SUM(`actual_pgm_fee`) AS `actual_pgm_fee`, SUM(`dmp_rapleaf_payout`)
AS `dmp_rapleaf_payout`, SUM(`dd_convs`) AS `dd_convs`  FROM `slicer`.`573_slicer_rnd_13`
 WHERE dt = '2016-07-28'  GROUP BY `advertiser_id`  LIMIT 30; 
plan  == Physical Plan == 
CollectLimit 30 
+- *HashAggregate(keys=[advertiser_id#15269], functions=[sum(conversions#15282L), sum(cast(dmp_rapleaf_margin#15290
as double)), sum(pvc#15307L), sum(cast(dmp_nielsen_payout#15289 as double)), sum(fraud_clicks#15297L),
sum(cast(impressions#15300 as double)), sum(cast(conv_prob#15280 as double)), sum(cast(dmp_liveramp_payout#15287
as double)), sum(cast(decisions#15285 as double)), sum(fraud_impressions#15298L), sum(advertiser_spent#15270),
sum(cast(actual_ssp_fee#15268 as double)), sum(cast(dmp_nielsen_margin#15288 as double)),
sum(first_impressions#15296L), sum(clicks#15278L), sum(cast(second_price#15308 as double)),
sum(cast(click_prob#15277 as double)), sum(clicks_static#15279L), sum(cast(expected_payout#15295
as double)), sum(cast(bid_price#15275 as double)), sum(cast(noads#15303 as double)), sum(cast(e#15292
as double)), sum(cast(g#15299 as double)), sum(publisher_revenue#15306), ... 4 more fields])

   +- Exchange hashpartitioning(advertiser_id#15269, 3) 
      +- *HashAggregate(keys=[advertiser_id#15269], functions=[partial_sum(conversions#15282L),
partial_sum(cast(dmp_rapleaf_margin#15290 as double)), partial_sum(pvc#15307L), partial_sum(cast(dmp_nielsen_payout#15289
as double)), partial_sum(fraud_clicks#15297L), partial_sum(cast(impressions#15300 as double)),
partial_sum(cast(conv_prob#15280 as double)), partial_sum(cast(dmp_liveramp_payout#15287 as
double)), partial_sum(cast(decisions#15285 as double)), partial_sum(fraud_impressions#15298L),
partial_sum(advertiser_spent#15270), partial_sum(cast(actual_ssp_fee#15268 as double)), partial_sum(cast(dmp_nielsen_margin#15288
as double)), partial_sum(first_impressions#15296L), partial_sum(clicks#15278L), partial_sum(cast(second_price#15308
as double)), partial_sum(cast(click_prob#15277 as double)), partial_sum(clicks_static#15279L),
partial_sum(cast(expected_payout#15295 as double)), partial_sum(cast(bid_price#15275 as double)),
partial_sum(cast(noads#15303 as double)), partial_sum(cast(e#15292 as double)), partial_sum(cast(g#15299
as double)), partial_sum(publisher_revenue#15306), ... 4 more fields]) 
         +- *Project [actual_pgm_fee#15267, actual_ssp_fee#15268, advertiser_id#15269,
advertiser_spent#15270, bid_price#15275, click_prob#15277, clicks#15278L, clicks_static#15279L,
conv_prob#15280, conversions#15282L, dd_convs#15284L, decisions#15285, dmp_liveramp_margin#15286,
dmp_liveramp_payout#15287, dmp_nielsen_margin#15288, dmp_nielsen_payout#15289, dmp_rapleaf_margin#15290,
dmp_rapleaf_payout#15291, e#15292, expected_payout#15295, first_impressions#15296L, fraud_clicks#15297L,
fraud_impressions#15298L, g#15299, ... 5 more fields] 
            +- *BatchedScan parquet slicer.573_slicer_rnd_13[actual_pgm_fee#15267,actual_ssp_fee#15268,advertiser_id#15269,advertiser_spent#15270,bid_price#15275,click_prob#15277,clicks#15278L,clicks_static#15279L,conv_prob#15280,conversions#15282L,dd_convs#15284L,decisions#15285,dmp_liveramp_margin#15286,dmp_liveramp_payout#15287,dmp_nielsen_margin#15288,dmp_nielsen_payout#15289,dmp_rapleaf_margin#15290,dmp_rapleaf_payout#15291,e#15292,expected_payout#15295,first_impressions#15296L,fraud_clicks#15297L,fraud_impressions#15298L,g#15299,...
7 more fields] Format: ParquetFormat, InputPaths: hdfs://spark-master1.uslicer.net:8020/user/hive/warehouse/slicer.db/573_slicer...,
PushedFilters: [], ReadSchema: struct<actual_pgm_fee:float,actual_ssp_fee:float,advertiser_id:int,advertiser_spent:double,bid_pr...



0: jdbc:hive2://spark-master1.uslicer> SELECT `advertiser_id` AS `advertiser_id`, SUM(`conversions`)
AS `conversions`, SUM(`dmp_rapleaf_margin`) AS `dmp_rapleaf_margin`, SUM(`pvc`) AS `pvc`,
SUM(`dmp_nielsen_payout`) AS `dmp_nielsen_payout`,  SUM(`fraud_clicks`) AS `fraud_clicks`,
SUM(`impressions`) AS `impressions`,  SUM(`conv_prob`) AS `conv_prob`, SUM(`dmp_liveramp_payout`)
AS `dmp_liveramp_payout`,  SUM(`decisions`) AS `decisions`,  SUM(`fraud_impressions`) AS
`fraud_impressions`, SUM(`advertiser_spent`) AS `advertiser_spent`, SUM(`actual_ssp_fee`)
AS `actual_ssp_fee`,  SUM(`dmp_nielsen_margin`) AS `dmp_nielsen_margin`, SUM(`first_impressions`)
AS `first_impressions`, SUM(`clicks`) AS `clicks`, SUM(`second_price`) AS `second_price`,
SUM(`click_prob`) AS `click_prob`, SUM(`clicks_static`) AS `clicks_static`, SUM(`expected_payout`)
AS `expected_payout`, SUM(`bid_price`) AS `bid_price`, SUM(`noads`) AS `noads`, SUM(`e`) AS
`e`, SUM(`g`) AS `g`, SUM(`publisher_revenue`) AS `publisher_revenue`, SUM(`dmp_liveramp_margin`)
AS `dmp_liveramp_margin`, SUM(`actual_pgm_fee`) AS `actual_pgm_fee`, SUM(`dmp_rapleaf_payout`)
AS `dmp_rapleaf_payout`, SUM(`dd_convs`) AS `dd_convs`  FROM `slicer`.`573_slicer_rnd_13`
 WHERE dt = '2016-07-28'  GROUP BY `advertiser_id`  LIMIT 30; 

(results for three runs) 
30 rows selected (2.158 seconds) 
30 rows selected (1.83 seconds) 
30 rows selected (1.979 seconds) 

Sergei Romanov.
Mime
View raw message