Communication between Driver and Executors

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Communication between Driver and Executors

Tobias Pfeiffer
Hi,

(this is related to my previous question about stopping the StreamingContext)

is there any way to send a message from the driver to the executors? There is all this Akka machinery running, so it should be easy to have something like

  sendToAllExecutors(message)

on the driver and

  handleMessage {
    case _ => ...
  }

on the executors, right? Surely at least for Broadcast.unpersist() such a thing must exist, so can I use it somehow (dirty way is also ok) to send a message to my Spark nodes?

Thanks
Tobias
Reply | Threaded
Open this post in threaded view
|

Re: Communication between Driver and Executors

Mayur Rustagi
I wonder if SparkConf is dynamically updated on all worker nodes or only during initialization. It can be used to piggyback information. 
Otherwise I guess you are stuck with Broadcast. 
Primarily I have had these issues moving legacy MR operators to Spark where MR piggybacks on Hadoop conf pretty  heavily, in spark Native application its rarely required. Do you have a usecase like that?


Mayur Rustagi
Ph: +1 (760) 203 3257

On Fri, Nov 14, 2014 at 10:28 AM, Tobias Pfeiffer <[hidden email]> wrote:
Hi,

(this is related to my previous question about stopping the StreamingContext)

is there any way to send a message from the driver to the executors? There is all this Akka machinery running, so it should be easy to have something like

  sendToAllExecutors(message)

on the driver and

  handleMessage {
    case _ => ...
  }

on the executors, right? Surely at least for Broadcast.unpersist() such a thing must exist, so can I use it somehow (dirty way is also ok) to send a message to my Spark nodes?

Thanks
Tobias

Reply | Threaded
Open this post in threaded view
|

Re: Communication between Driver and Executors

Tobias Pfeiffer
Hi,

On Fri, Nov 14, 2014 at 3:20 PM, Mayur Rustagi <[hidden email]> wrote:
I wonder if SparkConf is dynamically updated on all worker nodes or only during initialization. It can be used to piggyback information. 
Otherwise I guess you are stuck with Broadcast. 
Primarily I have had these issues moving legacy MR operators to Spark where MR piggybacks on Hadoop conf pretty  heavily, in spark Native application its rarely required. Do you have a usecase like that?

My "usecase" is http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-does-not-stop-td18826.html – that is, notifying my Spark executors that the StreamingContext has been shut down. (Even with non-graceful shutdown, Spark doesn't seem to end the actual execution, just all the Spark-internal timers etc.) I need to do this properly or processing will go on for a very long time.

I have been trying to mis-use broadcast as in
- create a class with a boolean var, set to true
- query this boolean on the executors as a prerequisite to process the next item
- when I want to shutdown, I set the boolean to false and unpersist the broadcast variable (which will trigger re-delivery).
This is very dirty, but it works with a "local[*]" master. Unfortunately, when deployed on YARN, the new value will never arrive at my executors.

Any idea what could go wrong on YARN with this approach – or what is a "good" way to do this?

Thanks
Tobias


Reply | Threaded
Open this post in threaded view
|

Re: Communication between Driver and Executors

Tobias Pfeiffer
Hi again,

On Mon, Nov 17, 2014 at 8:16 AM, Tobias Pfeiffer <[hidden email]> wrote:
I have been trying to mis-use broadcast as in
- create a class with a boolean var, set to true
- query this boolean on the executors as a prerequisite to process the next item
- when I want to shutdown, I set the boolean to false and unpersist the broadcast variable (which will trigger re-delivery).
This is very dirty, but it works with a "local[*]" master. Unfortunately, when deployed on YARN, the new value will never arrive at my executors.

In fact, it seems as if "change mutable object (like mutable list) and unpersist in order to trigger redeploy" only works locally. When running on YARN, even after an unpersist, the value will always be identical to what I shipped first. Now I wonder what unpersist actually does in that case. Must I call unpersist from an executor or from the driver?

Thanks
Tobias

Reply | Threaded
Open this post in threaded view
|

Re: Communication between Driver and Executors

Tobias Pfeiffer
Hi,

so I didn't manage to get the Broadcast variable with a new value distributed to my executors in YARN mode. In local mode it worked fine, but when running on YARN either nothing happened (when unpersist() was called on the driver) or I got a TimeoutException (when called on the executor).
I finally dropped the use of broadcast variables and added a HTTP polling mechanism from the executors to the driver. I find that a bit suboptimal, in particular since there is this whole Akka infrastructure already running and I should be able to just send messages around. However, Spark does not seem to encourage this. (In general I find that "private" is a bit overused in the Spark codebase...)

Thanks
Tobias