Why collect() has a stage but first() not?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Why collect() has a stage but first() not?

David Thomas
If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines:

14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true)
14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110)
14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

So why doesn't the webUI list the stages created when I run the 'first' action?
Reply | Threaded
Open this post in threaded view
|

Re: Why collect() has a stage but first() not?

Aaron Davidson
first() is allowed to "run locally", which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance.


On Wed, Feb 19, 2014 at 9:55 AM, David Thomas <[hidden email]> wrote:
If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines:

14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true)
14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110)
14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

So why doesn't the webUI list the stages created when I run the 'first' action?

Reply | Threaded
Open this post in threaded view
|

Re: Why collect() has a stage but first() not?

David Thomas
But my RDD is placed on the worker nodes. So how can driver perform the action by itself?


On Wed, Feb 19, 2014 at 10:57 AM, Aaron Davidson <[hidden email]> wrote:
first() is allowed to "run locally", which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance.


On Wed, Feb 19, 2014 at 9:55 AM, David Thomas <[hidden email]> wrote:
If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines:

14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true)
14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110)
14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

So why doesn't the webUI list the stages created when I run the 'first' action?


Reply | Threaded
Open this post in threaded view
|

Re: Why collect() has a stage but first() not?

Aaron Davidson
The driver does query for the first partition of the RDD using the BlockManager. If the RDD is cached, the worker process that has the first partition in memory will ship it back to the driver, and the driver will iterate over it.


On Wed, Feb 19, 2014 at 9:59 AM, David Thomas <[hidden email]> wrote:
But my RDD is placed on the worker nodes. So how can driver perform the action by itself?


On Wed, Feb 19, 2014 at 10:57 AM, Aaron Davidson <[hidden email]> wrote:
first() is allowed to "run locally", which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance.


On Wed, Feb 19, 2014 at 9:55 AM, David Thomas <[hidden email]> wrote:
If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines:

14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true)
14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110)
14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

So why doesn't the webUI list the stages created when I run the 'first' action?



Reply | Threaded
Open this post in threaded view
|

Re: Why collect() has a stage but first() not?

David Thomas
Thanks for the info. Are there any documentation that explains these things?


On Wed, Feb 19, 2014 at 11:26 AM, Aaron Davidson <[hidden email]> wrote:
The driver does query for the first partition of the RDD using the BlockManager. If the RDD is cached, the worker process that has the first partition in memory will ship it back to the driver, and the driver will iterate over it.


On Wed, Feb 19, 2014 at 9:59 AM, David Thomas <[hidden email]> wrote:
But my RDD is placed on the worker nodes. So how can driver perform the action by itself?


On Wed, Feb 19, 2014 at 10:57 AM, Aaron Davidson <[hidden email]> wrote:
first() is allowed to "run locally", which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance.


On Wed, Feb 19, 2014 at 9:55 AM, David Thomas <[hidden email]> wrote:
If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines:

14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true)
14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110)
14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

So why doesn't the webUI list the stages created when I run the 'first' action?




Reply | Threaded
Open this post in threaded view
|

Re: Why collect() has a stage but first() not?

Mark Hamstra
Not really, since it's considered mostly an implementation detail that shouldn't be important to users and that could conceivably change in the future.  There are some useful comments in the code if you really want to figure out how it is implemented.

And I will take a somewhat pedantic exception to the notion that first(), take() and other runLocally jobs don't have an associated Stage.  They do have a Stage, but that Stage isn't run via the task scheduler as are non-local jobs, and that is why the Stage doesn't show up in the webUI.  If it really made a difference, we could report the Stage start and stop events for runLocally jobs so that they could appear (briefly) in the webUI; but even then, Aaron is correct that there are no scheduler Tasks for a runLocally job, so runLocally jobs would still not look the same in the webUI as other jobs. 


On Wed, Feb 19, 2014 at 10:39 AM, David Thomas <[hidden email]> wrote:
Thanks for the info. Are there any documentation that explains these things?


On Wed, Feb 19, 2014 at 11:26 AM, Aaron Davidson <[hidden email]> wrote:
The driver does query for the first partition of the RDD using the BlockManager. If the RDD is cached, the worker process that has the first partition in memory will ship it back to the driver, and the driver will iterate over it.


On Wed, Feb 19, 2014 at 9:59 AM, David Thomas <[hidden email]> wrote:
But my RDD is placed on the worker nodes. So how can driver perform the action by itself?


On Wed, Feb 19, 2014 at 10:57 AM, Aaron Davidson <[hidden email]> wrote:
first() is allowed to "run locally", which means that the driver will execute the action itself without launching any tasks. This is also true of take(n) for sufficiently small n, for instance.


On Wed, Feb 19, 2014 at 9:55 AM, David Thomas <[hidden email]> wrote:
If I perform a 'collect' action on the RDD, I can see a new stage getting created in the spark web UI (http://master:4040/stages/), but when I do a 'first' action, I don't see any stage getting created. However on the console I see these lines:

14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110) with 1 output partitions (allowLocal=true)
14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at xxx.scala:110)
14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

So why doesn't the webUI list the stages created when I run the 'first' action?