spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lian Jiang <jiangok2...@gmail.com>
Subject Re: pandas_udf is very slow
Date Sun, 05 Apr 2020 16:50:51 GMT
Thanks Silvio. I need grouped map pandas UDF which takes a spark data frame as the input and
outputs a spark data frame having a different shape from input. Grouped map is kind of unique
to pandas udf and I have trouble to find a similar non pandas udf for an apple to apple comparison.
Let me know if you have better idea for investigating grouped map pandas udf slowness.

One potential work around could be grouping the 250M records by id. For each group, do groupby(‘id’).apply(pd_udf).
Not sure which way is more promising compared with repartition + mapPartition, reduceByKey,
combineByKey.

Appreciate any clue.

Sent from my iPhone

> On Apr 5, 2020, at 6:18 AM, Silvio Fiorito <silvio.fiorito@granturing.com> wrote:
> 
> 
> Your 2 examples are doing different things.
>  
> The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an aggregate.
>  
> I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your result the
same?
>  
> From: Lian Jiang <jiangok2006@gmail.com>
> Date: Sunday, April 5, 2020 at 3:28 AM
> To: user <user@spark.apache.org>
> Subject: pandas_udf is very slow
>  
> Hi,
>  
> I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored over non
pandas udf per https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.
>  
> My data has about 250M records and the pandas udf code is like:
>  
> def pd_udf_func(data):
>     return pd.DataFrame(["id"])
> 
> pd_udf = pandas_udf(
>             pd_udf_func,
>             returnType=("id int"),
>             functionType=PandasUDFType.GROUPED_MAP
>         )
> df3 = df.groupBy("id").apply(pd_udf)
> df3.explain()
> """
> == Physical Plan ==
> FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
> +- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#9L, 200)
>       +- *(1) Project [id#9L, id#9L, txt#10]
>          +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> As you can see, this pandas udf does nothing but returning a row having a pandas dataframe
having None values. In reality, this pandas udf has complicated logic (e.g. iterate through
the pandas dataframe rows and do some calculation). This simplification is to reduce noise
caused by application specific logic. This pandas udf takes hours to run using 10 executors
(14 cores and 64G mem each). On the other hand, below non-pandas udf can finish in minutes:
>  
> def udf_func(data_list):
>     return "hello"
> 
> udf = udf(udf_func, StringType())
> df2 = df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
udf('txt1'))
> df2.explain()
> """
> == Physical Plan ==
> *(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
> +- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
>    +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 0)])
>       +- Exchange hashpartitioning(id#9L, 200)
>          +- ObjectHashAggregate(keys=[id#9L], functions=[partial_collect_list(txt#10,
0, 0)])
>             +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> The physical plans show pandas udf uses sortAggregate (slower) while non-pandas udf uses
objectHashAggregate (faster).
>  
> Below is what I have tried to improve the performance of pandas udf but none of them
worked:
> 1. repartition before groupby. For example, df.repartition(140, "id").groupBy("id").apply(pd_udf).
140 is the same as spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition
but according to the execution plan the repartition seems to be ignored since groupby will
do partitioning itself.
> 
> 
> 2. although this slowness is more likely caused by pandas udf instead of groupby, I still
played with shuffle settings such as spark.shuffle.compress=True, spark.shuffle.spill.compress=True.
> 
> 
> 3. I played with serDe settings such as spark.serializer=org.apache.spark.serializer.KryoSerializer.
Also I tried pyarrow settings such as spark.sql.execution.arrow.enabled=True and spark.sql.execution.arrow.maxRecordsPerBatch=100000
> 
> 
> 4. I tried to replace the solution of "groupby + pandas udf " with combineByKey, reduceByKey,
repartition + mapPartition. But it is not easy since the pandas udf has complicated logic.
> 
>  
> My questions:
>  
> 1. why pandas udf is so slow?
> 2. is there a way to improve the performance of pandas_udf?
> 3. in case it is a known issue of pandas udf, what other remedy I can use? I guess I
need to think harder on combineByKey, reduceByKey, repartition + mapPartition. But want to
know if I missed anything obvious.
>  
> Any clue is highly appreciated.
>  
> Thanks
> Leon
>  
>  
>  
>  

Mime
View raw message