spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex <a...@unexpectedeof.net>
Subject [PySpark Profiler]: Does empty profile mean no execution in Python Interpreter?
Date Fri, 02 Nov 2018 03:00:34 GMT
Hi,

I ran into an interesting scenario with no profile output today. I have 
a PySpark application that primarily uses the Spark SQL APIs. I 
understand that parts of the Spark SQL API may not generate data in the 
PySpark profile dumps, but I was surprised when I had code containing a 
UDF that did not generate any profile output. I had thought anytime I 
used a UDF with Spark SQL that code would have to execute in a Python 
interpreter on the executor. Is that not the case? This went against my 
mental model for how this works in Spark, so I'm trying to understand 
what is happening here to cause no profile output, which made me wonder 
if the UDF had ran in the JVM.

I have created a github repo with this code in main.py and the example 
code in ticket 3478 https://github.com/apache/spark/pull/2556 in 
py_profile.py which does emit a profile dump.

https://github.com/AlexHagerman/pyspark-profiling

Thanks,

Alex


from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import broadcast, udf
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.linalg import Vector, VectorUDT

if __name__ == '__main__':

     spark = SparkSession.builder.appName("token_to_vec") \
             .config("spark.python.profile", "true") \
             .config("spark.python.profile.dump", "./main_dump/") \
             .config("spark.rdd.compress", "true") \
             .config("spark.dynamicAllocation.enabled", "true") \
             .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
             .config("spark.kryoserializer", "64") \
             .getOrCreate()

     lines_df = spark.read.parquet("./data/token.parquet")

     vecs = Word2VecModel.load('./data/word_vectors')
     vecs_df = vecs.getVectors()
     vecs_dict = vecs_df.collect()

     vec_dict = spark.sparkContext.broadcast({wv[0]: wv[1] for wv in vecs_dict})
     missing_vec = spark.sparkContext.broadcast(vec_dict.value['MISSING_TOKEN'])

     token_to_vec = udf(lambda r: [vec_dict.value.get(w, missing_vec.value) for w in r], ArrayType(VectorUDT()))

     tdf = lines_df.withColumn("ln_vec", token_to_vec("text"))

     tdf.write.mode("overwrite").parquet(path="./data/token_vecs.parquet", mode="overwrite",
compression="snappy")

     spark.sparkContext.show_profiles()
     spark.stop()




Mime
View raw message