Is Spark rdd.toDF() thread-safe?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Is Spark rdd.toDF() thread-safe?

yujhe.li
This post was updated on .
Hi,

I have an application that runs in a Spark-2.4.4 cluster and it transforms two RDD to DataFrame with `rdd.toDF()` then outputs them to file.
For slave resource usage optimization, the application executes the job in multi-thread. The code snippet looks like this:

import spark.implicits._

Seq(rdd1, rdd2).par.foreach { rdd =>
  rdd.toDF().write.text("xxx")
}

And I found that `toDF()` is not thread-safe. The application failed sometimes by `java.lang.UnsupportedOperationException`.
You can reproduce it from the following code snippet (1% to happen, it's easier to happen when the case class has a large number of fields.):

package example

import org.apache.spark.sql.SparkSession

object SparkToDF {

  case class SampleData(
                         field_1: Option[Int] = None,
                         field_2: Option[Long] = Some(0L),
                         field_3: Option[Double] = None,
                         field_4: Option[Float] = None,
                         field_5: Option[Short] = None,
                         field_6: Option[Byte] = None,
                         field_7: Option[Boolean] = Some(false),
                         field_8: Option[StructData] = Some(StructData(Some(0), Some(0L), None, None, None, None, None)),
                         field_9: String = "",
                         field_10: String = "",
                         field_11: String = "",
                         field_12: String = "",
                         field_13: String = "",
                         field_14: String = "",
                         field_15: String = "",
                         field_16: String = "",
                         field_17: String = "",
                         field_18: String = "",
                         field_19: String = "",
                         field_20: String = "",
                         field_21: String = "",
                         field_22: String = "",
                         field_23: String = "",
                         field_24: String = "",
                         field_25: String = "",
                         field_26: String = "",
                         field_27: String = "",
                         field_28: String = "",
                         field_29: String = "",
                         field_30: String = "",
                         field_31: String = "",
                         field_32: String = "",
                         field_33: String = "",
                         field_34: String = "",
                         field_35: String = "",
                         field_36: String = "",
                         field_37: String = "",
                         field_38: String = "",
                         field_39: String = "",
                         field_40: String = "",
                         field_41: String = "",
                         field_42: String = "",
                         field_43: String = "",
                         field_44: String = "",
                         field_45: String = "",
                         field_46: String = "",
                         field_47: String = "",
                         field_48: String = "",
                         field_49: String = "",
                         field_50: String = "",
                         field_51: String = "",
                         field_52: String = "",
                         field_53: String = "",
                         field_54: String = "",
                         field_55: String = "",
                         field_56: String = "",
                         field_57: String = "",
                         field_58: String = "",
                         field_59: String = "",
                         field_60: String = ""
                       )

  case class StructData(
                         field_1: Option[Int],
                         field_2: Option[Long],
                         field_3: Option[Double],
                         field_4: Option[Float],
                         field_5: Option[Short],
                         field_6: Option[Byte],
                         field_7: Option[Boolean]
                       )

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ThreadSafeToDF()")
      .getOrCreate()

    import spark.implicits._

    try {
      (1 to 10).par.foreach { _ =>
        (1 to 10000).map(_ => SampleData()).toDF()
      }
    } finally {
      spark.close()
    }
  }
}

for i in {1..200}; do
  echo "iter: $i (`date`)"; 
  ./bin/spark-submit --class sample.SparkToDF your.jar
done

You may get a similar exception message: `Schema for type A is not supported`

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Float] is not supported                                                                                                                
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:812)                                                                                                                                
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:743)                                                                                                                                
        at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)                                                                                                                                                  
        at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:929)                                                                                                                            
        at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)                                                                                                                                  
        at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:742)                                                                                                                                                
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:407)                                                                
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:406)                                                                
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)                                                                                                                                                   
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)                                                                                                                                                   
        at scala.collection.immutable.List.foreach(List.scala:392)                                                                                                                                                                            
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)                                                                                                                                                              
        at scala.collection.immutable.List.map(List.scala:296)                                                                                                                                                                                
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:406)                                                                           
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:176)
        at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
        at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:929)
        at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:176)
        at org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:164)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
        at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
        at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
        at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)

From the error message, the exception was caused by `ScalaReflection.schemaFor()`.
I had looked at the code, it seems like Spark uses Scala reflection to get the data type and as I know there is a concurrency issue in Scala reflection.

https://issues.apache.org/jira/browse/SPARK-26555
thread-safety-in-scala-reflection-with-type-matching

Should we fix it? I can not find any document about thread-safe in creating DataFrame.

I had workarounded this by adding a lock when transforming RDD to DataFrame.

object toDFLock

import spark.implicits

Seq(rdd1, rdd2).par.foreach { rdd =>
  toDFLock.synchronized(rdd.toDF()).write.text("xxx")
}