Are all transformations lazy?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Are all transformations lazy?

David Thomas
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)
Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

Ewen Cheslack-Postava
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021).

March 11, 2014 at 9:49 PM
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)
Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

David Thomas
I think you misunderstood my question - I should have stated it better. I'm not saying it should be applied immediately, but I'm trying to understand how Spark achieves this lazy computation transformations. May be this is due to my ignorance of how Scala works, but when I see the code, I see that the function is applied to the elements of RDD when I call distinct - or is it not applied immediately? How does the returned RDD 'keep track of the operation'?


On Tue, Mar 11, 2014 at 10:06 PM, Ewen Cheslack-Postava <[hidden email]> wrote:
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021).

March 11, 2014 at 9:49 PM
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)

Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

Mayur Rustagi
The only point where some *actual* computation happens is when data is requested by driver (using collect()) or materialized in external storage (ex: saveashadoopfile). Rest of the time operations are merely stored & saved. Once you actually ask for the data, the operations are compiled into a DAG of stages. Each stage can contain multiple tasks (like 2 filter operations can be combined into one stage) & executed. Hence the operations are all lazy by default. 

Mayur Rustagi
Ph: +1 (760) 203 3257


On Tue, Mar 11, 2014 at 10:15 PM, David Thomas <[hidden email]> wrote:
I think you misunderstood my question - I should have stated it better. I'm not saying it should be applied immediately, but I'm trying to understand how Spark achieves this lazy computation transformations. May be this is due to my ignorance of how Scala works, but when I see the code, I see that the function is applied to the elements of RDD when I call distinct - or is it not applied immediately? How does the returned RDD 'keep track of the operation'?


On Tue, Mar 11, 2014 at 10:06 PM, Ewen Cheslack-Postava <[hidden email]> wrote:
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021).

March 11, 2014 at 9:49 PM
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)


Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

Sandy Ryza
In reply to this post by David Thomas
distinct is lazy because the map and reduceByKey functions it calls are lazy as well.  When they're called, the only thing that happens is that state is built up on the client side.  distinct will return an RDD for the map operation that points to the RDD that it depends on, that in turn point to the RDDs that they depend on. These collectively form a transformation graph that the scheduler can use to create tasks when an action operation is called. 


On Tue, Mar 11, 2014 at 10:15 PM, David Thomas <[hidden email]> wrote:
I think you misunderstood my question - I should have stated it better. I'm not saying it should be applied immediately, but I'm trying to understand how Spark achieves this lazy computation transformations. May be this is due to my ignorance of how Scala works, but when I see the code, I see that the function is applied to the elements of RDD when I call distinct - or is it not applied immediately? How does the returned RDD 'keep track of the operation'?


On Tue, Mar 11, 2014 at 10:06 PM, Ewen Cheslack-Postava <[hidden email]> wrote:
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021).

March 11, 2014 at 9:49 PM
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)


Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

Ewen Cheslack-Postava
In reply to this post by David Thomas
Ah, I see. You need to follow those other calls through to their implementations to see what ultimately happens. For example, the map() calls are to RDD.map, not one of Scala's built-in map methods for collections. The implementation looks like this:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
So once you get to one of the most primitive operations, like, map(), you'll see the function actually generates a specific type of RDD representing the transformation. MappedRDD just stores a reference to the previous RDD, the function it needs to apply -- it doesn't actually contain any data. Of course the idea is that it *looks* like the normal map(), filter(), etc. in Scala, but it doesn't work the same way.

By calling a bunch of these functions, you end up generating a graph, specifically a DAG, of RDDs. This graph describes all the steps needed to perform the operation, but no data. The final action, e.g. count() or collect(), that triggers computation is called on one of these RDDs. To get the value out, the Spark runtime/scheduler traverses the DAG starting from that RDD and triggers evaluation of anything parent RDDs it needs that aren't computed and cached yet.

Any future operations build on the same DAG as long as you use the same RDD objects and, if you used cache() or persist(), can reuse the same data after it has been computed the first time.

-Ewen

March 11, 2014 at 10:15 PM
I think you misunderstood my question - I should have stated it better. I'm not saying it should be applied immediately, but I'm trying to understand how Spark achieves this lazy computation transformations. May be this is due to my ignorance of how Scala works, but when I see the code, I see that the function is applied to the elements of RDD when I call distinct - or is it not applied immediately? How does the returned RDD 'keep track of the operation'?



March 11, 2014 at 10:06 PM
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021).

March 11, 2014 at 9:49 PM
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)
Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

David Thomas
Perfect! That answers my question. I was under the impression that map and reduceByKey were Scala collection functions, but they weren't. Now it makes sense.





On Tue, Mar 11, 2014 at 10:38 PM, Ewen Cheslack-Postava <[hidden email]> wrote:
Ah, I see. You need to follow those other calls through to their implementations to see what ultimately happens. For example, the map() calls are to RDD.map, not one of Scala's built-in map methods for collections. The implementation looks like this:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
So once you get to one of the most primitive operations, like, map(), you'll see the function actually generates a specific type of RDD representing the transformation. MappedRDD just stores a reference to the previous RDD, the function it needs to apply -- it doesn't actually contain any data. Of course the idea is that it *looks* like the normal map(), filter(), etc. in Scala, but it doesn't work the same way.

By calling a bunch of these functions, you end up generating a graph, specifically a DAG, of RDDs. This graph describes all the steps needed to perform the operation, but no data. The final action, e.g. count() or collect(), that triggers computation is called on one of these RDDs. To get the value out, the Spark runtime/scheduler traverses the DAG starting from that RDD and triggers evaluation of anything parent RDDs it needs that aren't computed and cached yet.

Any future operations build on the same DAG as long as you use the same RDD objects and, if you used cache() or persist(), can reuse the same data after it has been computed the first time.

-Ewen

March 11, 2014 at 10:15 PM
I think you misunderstood my question - I should have stated it better. I'm not saying it should be applied immediately, but I'm trying to understand how Spark achieves this lazy computation transformations. May be this is due to my ignorance of how Scala works, but when I see the code, I see that the function is applied to the elements of RDD when I call distinct - or is it not applied immediately? How does the returned RDD 'keep track of the operation'?



March 11, 2014 at 10:06 PM
You should probably be asking the opposite question: why do you think it *should* be applied immediately? Since the driver program hasn't requested any data back (distinct generates a new RDD, it doesn't return any data), there's no need to actually compute anything yet.

As the documentation describes, if the call returns an RDD, it's transforming the data and will just keep track of the operation it eventually needs to perform. Only methods that return data back to the driver should trigger any computation.

(The one known exception is sortByKey, which really should be lazy, but apparently uses an RDD.count call in its implementation: https://spark-project.atlassian.net/browse/SPARK-1021).

March 11, 2014 at 9:49 PM
For example, is distinct() transformation lazy?

when I see the Spark source code, distinct applies a map-> reduceByKey -> map function to the RDD elements. Why is this lazy? Won't the function be applied immediately to the elements of RDD when I call someRDD.distinct?

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)

Reply | Threaded
Open this post in threaded view
|

Re: Are all transformations lazy?

Rico
It may be confusing at first but there is also an important difference between reduce and reduceByKey operations.

reduce is an action on an RDD. Hence, it will request the evaluation of transformations that resulted to the RDD.

In contrast, reduceByKey is a transformation on PairRDDs, not an action. Therefore, distinct is implemented as a chain of transformations as below:

map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)