spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nicktgr15 <nicktg...@gmail.com>
Subject Pyspark UDF causing ExecutorLostFailure
Date Thu, 07 Sep 2017 09:16:39 GMT
Hi,
I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as
follows:

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

path = 's3://some/parquet/dir/myfile.parquet'
df = spark.read.load(path)
def _test_udf(useragent):
    return useragent.upper()

test_udf = udf(_test_udf, StringType())
df = df.withColumn('test_field', test_udf(col('std_useragent')))
df.write.parquet('/output.parquet')

The following config is used in spark-defaults.conf (using
maximizeResourceAllocation in EMR)

...
spark.executor.instances         4
spark.executor.cores             8
spark.driver.memory              8G
spark.executor.memory            9658M
spark.default.parallelism        64
spark.driver.maxResultSize       3G
...

The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU,
15 GiB memory, 160 SSD GB storage
The above example fails every single time with errors like the following:

17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50,
ip-172-31-7-125.eu-west-1.compute.internal, executor 10):
ExecutorLostFailure (executor 10 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of
10.4 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

I tried to increase the spark.yarn.executor.memoryOverhead to 3000 which
delays the errors but eventually I get them before the end of the job. The
job eventually fails.

If I run the above job in scala everything works as expected (without having
to adjust the memoryOverhead)

import org.apache.spark.sql.functions.udf

val upper: String => String = _.toUpperCase
val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
val upperUDF = udf(upper)
val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
newdf.write.parquet("/output.parquet")

Cpu utilisation is also very bad with pyspark.

Is this a known bug with pyspark and udfs or is it a matter of bad
configuration? 

I've also raised a ticket in apache jira
https://issues.apache.org/jira/browse/SPARK-21935 which includes some
screenshots.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message