I seem to have an issue in Spark where I create a spark worker process and listen for jobs from another machine. After about 24 hours and ~3000 jobs, some jobs in my spark worker just hang indefinitely.

I am trying to set a timeout for my tasks so that the spark session can be stopped and re-started if a job is taking more than 1 hour or so.

To do this, I send a signal and raise an exception - similar to: library https://github.com/pnpnpn/timeout-decorator
And it seems to work well in normal python, but not in PySpark.

When the timeout signal is sent in pySpark, py4j seems to catch it and throws a py4j.Py4jError - and hence i cannot figure out if the error was caused by a timeout or something else.

I am wondering how I can figure out what caused the original exception in Pyspark. here is some example code to throw a similar error, and I am unable to figure out in my `except` whether it was caused by MyExc or something else:

import pyspark
from pyspark.sql import functions as F
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [['a1', 'b1', 'c1', 1],
     ['a2', 'b2', 'c2', 2],
     ['a3', 'b3', 'c3', 3]],
    ['a', 'b', 'c', 'x'])

class MyExc(Exception):
    pass

@pyspark.sql.functions.udf
def myudf(x):
    raise MyExc("my exception")
    return x

df = df.withColumn("x2", myudf(df['x']))
try:
    df.show()
except Exception as err:
    print("Got err", type(err), err)
    # import ipdb; ipdb.set_trace()
    raise