spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abdeali Kothari <>
Subject Identifying cause of exception in PySpark
Date Mon, 10 Dec 2018 03:40:07 GMT
I seem to have an issue in Spark where I create a spark worker process and
listen for jobs from another machine. After about 24 hours and ~3000 jobs,
some jobs in my spark worker just hang indefinitely.

I am trying to set a timeout for my tasks so that the spark session can be
stopped and re-started if a job is taking more than 1 hour or so.

To do this, I send a signal and raise an exception - similar to: library
And it seems to work well in normal python, but not in PySpark.

When the timeout signal is sent in pySpark, py4j seems to catch it and
throws a py4j.Py4jError - and hence i cannot figure out if the error was
caused by a timeout or something else.

I am wondering how I can figure out what caused the original exception in
Pyspark. here is some example code to throw a similar error, and I am
unable to figure out in my `except` whether it was caused by MyExc or
something else:

import pyspark
from pyspark.sql import functions as F
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [['a1', 'b1', 'c1', 1],
     ['a2', 'b2', 'c2', 2],
     ['a3', 'b3', 'c3', 3]],
    ['a', 'b', 'c', 'x'])

class MyExc(Exception):

def myudf(x):
    raise MyExc("my exception")
    return x

df = df.withColumn("x2", myudf(df['x']))
except Exception as err:
    print("Got err", type(err), err)
    # import ipdb; ipdb.set_trace()

View raw message