spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex <>
Subject [PySpark Profiler]: Does empty profile mean no execution in Python Interpreter?
Date Fri, 02 Nov 2018 03:00:34 GMT

I ran into an interesting scenario with no profile output today. I have 
a PySpark application that primarily uses the Spark SQL APIs. I 
understand that parts of the Spark SQL API may not generate data in the 
PySpark profile dumps, but I was surprised when I had code containing a 
UDF that did not generate any profile output. I had thought anytime I 
used a UDF with Spark SQL that code would have to execute in a Python 
interpreter on the executor. Is that not the case? This went against my 
mental model for how this works in Spark, so I'm trying to understand 
what is happening here to cause no profile output, which made me wonder 
if the UDF had ran in the JVM.

I have created a github repo with this code in and the example 
code in ticket 3478 in which does emit a profile dump.



from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import broadcast, udf
from import Word2Vec, Word2VecModel
from import Vector, VectorUDT

if __name__ == '__main__':

     spark = SparkSession.builder.appName("token_to_vec") \
             .config("spark.python.profile", "true") \
             .config("spark.python.profile.dump", "./main_dump/") \
             .config("spark.rdd.compress", "true") \
             .config("spark.dynamicAllocation.enabled", "true") \
             .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
             .config("spark.kryoserializer", "64") \

     lines_df ="./data/token.parquet")

     vecs = Word2VecModel.load('./data/word_vectors')
     vecs_df = vecs.getVectors()
     vecs_dict = vecs_df.collect()

     vec_dict = spark.sparkContext.broadcast({wv[0]: wv[1] for wv in vecs_dict})
     missing_vec = spark.sparkContext.broadcast(vec_dict.value['MISSING_TOKEN'])

     token_to_vec = udf(lambda r: [vec_dict.value.get(w, missing_vec.value) for w in r], ArrayType(VectorUDT()))

     tdf = lines_df.withColumn("ln_vec", token_to_vec("text"))

     tdf.write.mode("overwrite").parquet(path="./data/token_vecs.parquet", mode="overwrite",


View raw message