spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tianlang <tianlangstu...@aliyun.com.INVALID>
Subject Re: help understanding physical plan
Date Thu, 15 Aug 2019 06:18:59 GMT
Hi,

Maybe you can look at the spark ui. The physical plan has no time 
consuming information.

在 2019/8/13 下午10:45, Marcelo Valle 写道:
> Hi,
>
> I have a job running on AWS EMR. It's basically a join between 2 
> tables (parquet files on s3), one somehow large (around 50 gb) and 
> other small (less than 1gb).
> The small table is the result of other operations, but it was a 
> dataframe with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the 
> count on this dataframe finishes quickly.
> When I run my "LEFT_ANTI" join, I get the execution plan down bellow. 
> While most of my jobs on larges amount of data take max 1 h on this 
> cluster, this one takes almost 1 day to complete.
>
> What could I be doing wrong? I am trying to analyze the plan, but I 
> can't find anything that justify the slowness. It has 2 shuffles 
> followed by a zip, but other jobs have similar things and they are not 
> that slow.
>
> Could anyone point me to possible actions I could take to investigate 
> this?
>
> Thanks,
> Marcelo.
>
> == Physical Plan ==
> *(2) Project [USAGE_AGGREGATED_METADATA_ID#1493, 
> SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702, 
> USAGE_AGGREGATED_METADATA_HASH#1513]
> +- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, 
> ), coalesce(SENDER_RECORDING_IDENTIFIER#1499, )], 
> [coalesce(USAGE_AGGREGATED_METADATA_ID#356, ), 
> coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight, 
> ((USAGE_AGGREGATED_METADATA_ID#356 <=> 
> USAGE_AGGREGATED_METADATA_ID#1493) && (SENDER_RECORDING_IDENTIFIER#357 
> <=> SENDER_RECORDING_IDENTIFIER#1499))
>    :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493, 
> SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
>    :     +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, 
> ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, 
> RECORDING_DISPLAY_ARTIST#1497, WORK_WRITERS#1498, 
> SENDER_RECORDING_IDENTIFIER#1499, RECORDING_VERSION_TITLE#1500, 
> WORK_TITLE#1501, CONTENT_TYPE#1502, 
> USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 
> replicas)
>    :           +- *(2) Project [ID#328 AS 
> USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS 
> ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS 
> RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS 
> WORK_WRITERS#1498, uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null 
> AS RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS 
> CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null, 
> artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS 
> USAGE_AGGREGATED_METADATA_HASH#1513]
>    :              +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ), 
> coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ), 
> coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )], 
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, 
> ), coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], 
> Inner, BuildLeft, (((((isrc#289 <=> isrc_1#1419) && (iswc#290 <=>

> iswc_1#1420)) && (track_name#291 <=> track_name_1#1421)) && 
> (artist_name#292 <=> artist_name_1#1422)) && (work_writer_names#293 
> <=> work_writer_names_1#1423))
>    :                 :- BroadcastExchange 
> HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
> coalesce(input[2, string, true], ), coalesce(input[3, string, true], 
> ), coalesce(input[4, string, true], ), coalesce(input[5, string, 
> true], )))
>    :                 :  +- *(1) Project [ID#328, isrc#289 AS 
> isrc_1#1419, iswc#290 AS iswc_1#1420, track_name#291 AS 
> track_name_1#1421, artist_name#292 AS artist_name_1#1422, 
> work_writer_names#293 AS work_writer_names_1#1423]
>    :                 :     +- *(1) Filter isnotnull(ID#328)
>    :                 :        +- InMemoryTableScan [ID#328, 
> artist_name#292, isrc#289, iswc#290, track_name#291, 
> work_writer_names#293], [isnotnull(ID#328)]
>    :                 :              +- InMemoryRelation [ID#328, 
> isrc#289, iswc#290, track_name#291, artist_name#292, 
> work_writer_names#293], StorageLevel(disk, memory, 1 replicas)
>    :                 :                    +- *(2) Project [ID#328,

> isrc#289, iswc#290, track_name#291, artist_name#292, 
> work_writer_names#293]
>    :                 :                       +- *(2) BroadcastHashJoin

> [coalesce(ISRC#329, ), coalesce(ISWC#330, ), 
> coalesce(RECORDING_TITLE#331, ), 
> coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, 
> )], [coalesce(isrc#289, ), coalesce(iswc#290, ), 
> coalesce(track_name#291, ), coalesce(substring(artist_name#292, 0, 
> 1000), ), coalesce(work_writer_names#293, )], RightOuter, BuildLeft, 
> (((((isrc#289 <=> ISRC#329) && (iswc#290 <=> ISWC#330)) &&

> (track_name#291 <=> RECORDING_TITLE#331)) && 
> (substring(artist_name#292, 0, 1000) <=> 
> RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=> 
> WORK_WRITERS#333))
>    :                 :                          :- BroadcastExchange

> HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
> coalesce(input[2, string, true], ), coalesce(input[3, string, true], 
> ), coalesce(input[4, string, true], ), coalesce(input[5, string, 
> true], )))
>    :                 :                          :  +- *(1) Project

> [ID#328, ISRC#329, ISWC#330, RECORDING_TITLE#331, 
> RECORDING_DISPLAY_ARTIST#332, WORK_WRITERS#333]
>    :                 :                          :     +- *(1)
Filter 
> ((isnull(WORK_TITLE#334) && isnull(RECORDING_VERSION_TITLE#335)) && 
> (CONTENT_TYPE#336 <=> SOUND))
>    :                 :                          :        +-
*(1) 
> FileScan parquet 
> [ID#328,ISRC#329,ISWC#330,RECORDING_TITLE#331,RECORDING_DISPLAY_ARTIST#332,WORK_WRITERS#333,WORK_TITLE#334,RECORDING_VERSION_TITLE#335,CONTENT_TYPE#336]

> Batched: true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,

> PartitionFilters: [], PushedFilters: [IsNull(WORK_TITLE), 
> IsNull(RECORDING_VERSION_TITLE), EqualNullSafe(CONTENT_TYPE,SOUND)], 
> ReadSchema: 
> struct<ID:string,ISRC:string,ISWC:string,RECORDING_TITLE:string,RECORDING_DISPLAY_ARTIST:string,W...
>    :                 :                          +- *(2) FileScan

> parquet 
> [isrc#289,iswc#290,track_name#291,artist_name#292,work_writer_names#293] 
> Batched: true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,

> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<isrc:string,iswc:string,track_name:string,artist_name:string,work_writer_names:string>
>    :                 +- *(2) FileScan parquet 
> [uri#286,isrc#289,iswc#290,track_name#291,artist_name#292,work_writer_names#293] 
> Batched: true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,

> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<uri:string,isrc:string,iswc:string,track_name:string,artist_name:string,work_writer_names:...
>    +- BroadcastExchange 
> HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ), 
> coalesce(input[1, string, true], )))
>       +- *(1) FileScan parquet 
> [USAGE_AGGREGATED_METADATA_ID#356,SENDER_RECORDING_IDENTIFIER#357] 
> Batched: true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/marcelo.valle/git/amra-cloud-usage-ingestion/target/test-classes/ua...,

> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<USAGE_AGGREGATED_METADATA_ID:string,SENDER_RECORDING_IDENTIFIER:string>
>
> This email is confidential [and may be protected by legal privilege]. 
> If you are not the intended recipient, please do not copy or disclose 
> its content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
> United Kingdom
>
-- 

TianlangStudio <https://edu.51cto.com/sd/aca72>

Some of the biggest lies: I will start tomorrow/Others are better than 
me/I am not good enough/I don't have time/This is the way I am 
<https://edu.51cto.com/sd/aca72>

<https://github.com/TianLangStudio>



Mime
View raw message