Stateful RDD

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Bao
Reply | Threaded
Open this post in threaded view
|

Stateful RDD

Bao
This post has NOT been accepted by the mailing list yet.
I started using spark successfully for some simple transformations. Now I have a need to execute dynamic scripting to transform data instead of closure. I can init a ScriptEngine in driver as below code (only tested local).  I don't think it would distribute the work load to workers b/c the ScriptEngine is only at Driver. Would it work if I create a custom RDD and init ScriptEngine there? Any ideas are appreciate - thanks.

object ScriptEngine {
  val factory = new javax.script.ScriptEngineManager
  val engine = factory.getEngineByName("JavaScript")
 
  def eval(script: String) = engine.eval(script)
}

object ScriptEvaluator {
  def main(args: Array[String]) {
    val sc = new SparkContext("local", "test")
   
    val scripts = sc.makeRDD(List("1+1", "1+2"))
    val resultRdd = scripts.map(s => ScriptEngine.eval(s))

    resultRdd.saveAsTextFile("/tmp/result")
  }
}
Bao
Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Bao
It looks like I need to use DStream instead...



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p85.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Christopher Nguyen
Bao, as described, your use case doesn't need to invoke anything like custom RDDs or DStreams.

In a call like

    val resultRdd = scripts.map(s => ScriptEngine.eval(s)) 

Spark will do its best to serialize/deserialize ScriptEngine to each of the workers---if ScriptEngine is Serializable.

Now, if it makes no difference to you, consider instantiating ScriptEngine within the closure itself, thus obviating the need for serdes of things outside the closure.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao


On Fri, Dec 27, 2013 at 7:56 PM, Bao <[hidden email]> wrote:
It looks like I need to use DStream instead...



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p85.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Tathagata Das
Just to add to Christopher's suggestion, do make sure that the ScriptEngine.eval is thread-safe. If it is not, you can use ThreadLocal to make sure there is one instance per execution thread. 

TD


On Fri, Dec 27, 2013 at 8:12 PM, Christopher Nguyen <[hidden email]> wrote:
Bao, as described, your use case doesn't need to invoke anything like custom RDDs or DStreams.

In a call like

    val resultRdd = scripts.map(s => ScriptEngine.eval(s)) 

Spark will do its best to serialize/deserialize ScriptEngine to each of the workers---if ScriptEngine is Serializable.

Now, if it makes no difference to you, consider instantiating ScriptEngine within the closure itself, thus obviating the need for serdes of things outside the closure.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao


On Fri, Dec 27, 2013 at 7:56 PM, Bao <[hidden email]> wrote:
It looks like I need to use DStream instead...



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p85.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Bao
Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Bao
Thanks guys, that's interesting. Though it looks like singleton object is defined at driver, spark actually will serialize closure and send to workers. The interesting thing is that ScriptEngine is NOT serializable, but till it hasn't been initialized spark can serialize the closure well. But if I force it initialize first then spark throws NotSerializeableException.

Anyway, following Christopher's suggestion to avoid reference to outside closure is better.

TD, do you mean that Executors share the same SerializerInstance and there is a case that more than 1 thread call the same closure instance?

-Bao.
Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Tathagata Das
No I did not mean that. What I meant was something more simple. Let's say the ScriptEngine maintains some internal state and the function ScriptEngine.eval(...) is not thread-safe. That is, calling ScriptEngine.eval simultaneously from multiple threads would cause race conditions in the internal state and eval() would give incorrect answers. That would be a problem if you use ScriptEngine in a map function, because multiple threads in a worker JVM may be running the map function simultaneously. Something you should be aware of when using static stateful objects within Spark. 

TD


On Sun, Dec 29, 2013 at 7:32 PM, Bao <[hidden email]> wrote:
Thanks guys, that's interesting. Though it looks like singleton object is
defined at driver, spark actually will serialize closure and send to
workers. The interesting thing is that ScriptEngine is NOT serializable, but
till it hasn't been initialized spark can serialize the closure well. But if
I force it initialize first then spark throws NotSerializeableException.

Anyway, following Christopher's suggestion to avoid reference to outside
closure is better.

TD, do you mean that Executors share the same SerializerInstance and there
is a case that more than 1 thread call the same closure instance?

-Bao.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p97.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Christopher Nguyen
Bao, to help clarify what TD is saying: Spark launches multiple workers on multiple threads in parallel, running the same closure code in the same JVM on the same machine, but operating on different rows of data.

Because of this parallelism, if that worker code weren't thread-safe for some reason, you'd have a problem.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao


On Mon, Dec 30, 2013 at 4:27 PM, Tathagata Das <[hidden email]> wrote:
No I did not mean that. What I meant was something more simple. Let's say the ScriptEngine maintains some internal state and the function ScriptEngine.eval(...) is not thread-safe. That is, calling ScriptEngine.eval simultaneously from multiple threads would cause race conditions in the internal state and eval() would give incorrect answers. That would be a problem if you use ScriptEngine in a map function, because multiple threads in a worker JVM may be running the map function simultaneously. Something you should be aware of when using static stateful objects within Spark. 

TD


On Sun, Dec 29, 2013 at 7:32 PM, Bao <[hidden email]> wrote:
Thanks guys, that's interesting. Though it looks like singleton object is
defined at driver, spark actually will serialize closure and send to
workers. The interesting thing is that ScriptEngine is NOT serializable, but
till it hasn't been initialized spark can serialize the closure well. But if
I force it initialize first then spark throws NotSerializeableException.

Anyway, following Christopher's suggestion to avoid reference to outside
closure is better.

TD, do you mean that Executors share the same SerializerInstance and there
is a case that more than 1 thread call the same closure instance?

-Bao.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p97.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Bao
Reply | Threaded
Open this post in threaded view
|

Re: Stateful RDD

Bao
yeah, it makes sense. Thanks guys.