Spark with Scala : understanding closures or best way to take udf registrations' code out of main and put in utils

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark with Scala : understanding closures or best way to take udf registrations' code out of main and put in utils

aastha
This is more of a Scala  concept doubt than Spark. I have this Spark
initialization code :

    object EntryPoint {
       val spark = SparkFactory.createSparkSession(...
       val funcsSingleton = ContextSingleton[CustomFunctions] { new
CustomFunctions(Some(hashConf)) }
       lazy val funcs = funcsSingleton.get
       //this part I want moved to another place since there are many many
UDFs
       spark.udf.register("funcName", udf {funcName _ })
    }
The other class, CustomFunctions looks like this

    class CustomFunctions(val hashConfig: Option[HashConfig], sark:
Option[SparkSession] = None) {
         val funcUdf = udf { funcName _ }
         def funcName(colValue: String) = withDefinedOpt(hashConfig) { c =>
         ...}
    }
^ class is wrapped in Serializable interface using ContextSingleton which is
defined like so

    class ContextSingleton[T: ClassTag](constructor: => T) extends AnyRef
with Serializable {
       val uuid = UUID.randomUUID.toString
       @transient private lazy val instance =
ContextSingleton.pool.synchronized {
    ContextSingleton.pool.getOrElseUpdate(uuid, constructor)
       }
       def get = instance.asInstanceOf[T]
    }
    object ContextSingleton {
       private val pool = new TrieMap[String, Any]()
       def apply[T: ClassTag](constructor: => T): ContextSingleton[T] = new
ContextSingleton[T](constructor)
       def poolSize: Int = pool.size
       def poolClear(): Unit = pool.clear()
    }

Now to my problem, I want to not have to explicitly register the udfs as
done in the EntryPoint app. I create all udfs as needed in my
CustomFunctions class and then register dynamically only the ones that I
read from user provided config. What would be the best way to achieve it?
Also, I want to register the required udfs outside the main app but that
throws me the infamous `TaskNotSerializable` exception. Serializing the big
CustomFunctions is not a good idea, hence wrapped it up in ContextSingleton
but my problem of registering udfs outside cannot be solved that way. Please
suggest the right approach.
 







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]