[PySpark Profiler]: Does empty profile mean no execution in Python Interpreter?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[PySpark Profiler]: Does empty profile mean no execution in Python Interpreter?



I ran into an interesting scenario with no profile output today. I have a PySpark application that primarily uses the Spark SQL APIs. I understand that parts of the Spark SQL API may not generate data in the PySpark profile dumps, but I was surprised when I had code containing a UDF that did not generate any profile output. I had thought anytime I used a UDF with Spark SQL that code would have to execute in a Python interpreter on the executor. Is that not the case? This went against my mental model for how this works in Spark, so I'm trying to understand what is happening here to cause no profile output, which made me wonder if the UDF had ran in the JVM.

I have created a github repo with this code in main.py and the example code in ticket 3478 https://github.com/apache/spark/pull/2556 in py_profile.py which does emit a profile dump.




from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import broadcast, udf
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.linalg import Vector, VectorUDT

if __name__ == '__main__':

    spark = SparkSession.builder.appName("token_to_vec") \
            .config("spark.python.profile", "true") \
            .config("spark.python.profile.dump", "./main_dump/") \
            .config("spark.rdd.compress", "true") \
            .config("spark.dynamicAllocation.enabled", "true") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.kryoserializer", "64") \

    lines_df = spark.read.parquet("./data/token.parquet")

    vecs = Word2VecModel.load('./data/word_vectors')
    vecs_df = vecs.getVectors()
    vecs_dict = vecs_df.collect()

    vec_dict = spark.sparkContext.broadcast({wv[0]: wv[1] for wv in vecs_dict})
    missing_vec = spark.sparkContext.broadcast(vec_dict.value['MISSING_TOKEN'])

    token_to_vec = udf(lambda r: [vec_dict.value.get(w, missing_vec.value) for w in r], ArrayType(VectorUDT()))

    tdf = lines_df.withColumn("ln_vec", token_to_vec("text"))

    tdf.write.mode("overwrite").parquet(path="./data/token_vecs.parquet", mode="overwrite", compression="snappy")