Basic question on RDD caching

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Basic question on RDD caching

David Thomas
When I persist/cache an RDD, are all the derived RDDs cached as well or do I need to  call cache individually on each RDD if I need them to be cached?

For ex:

val originalRDD = sc.parallelize(...)
originalRDD.cache
val derivedRDD = originalRDD.map()

Is derivedRDD cached in this case?
Reply | Threaded
Open this post in threaded view
|

Re: Basic question on RDD caching

Ewen Cheslack-Postava

Only originalRDD is cached. You need to call cache/persist for every RDD you want cached.

February 19, 2014 at 10:03 PM
When I persist/cache an RDD, are all the derived RDDs cached as well or do I need to  call cache individually on each RDD if I need them to be cached?

For ex:

val originalRDD = sc.parallelize(...)
originalRDD.cache
val derivedRDD = originalRDD.map()

Is derivedRDD cached in this case?
Reply | Threaded
Open this post in threaded view
|

Re: Basic question on RDD caching

David Thomas
Thanks.

Another basic question:

Lets say derivedRDD is much larger than originalRDD and it doesn't fit into memory. Will Spark take care of automatically spilling it to disk? or will I face JavaHeap out of memory?





On Wed, Feb 19, 2014 at 11:05 PM, Ewen Cheslack-Postava <[hidden email]> wrote:

Only originalRDD is cached. You need to call cache/persist for every RDD you want cached.

February 19, 2014 at 10:03 PM
When I persist/cache an RDD, are all the derived RDDs cached as well or do I need to  call cache individually on each RDD if I need them to be cached?

For ex:

val originalRDD = sc.parallelize(...)
originalRDD.cache
val derivedRDD = originalRDD.map()

Is derivedRDD cached in this case?

Reply | Threaded
Open this post in threaded view
|

Re: Basic question on RDD caching

Ewen Cheslack-Postava
That actually depends on what's operations follow it. The RDD DAG gets broken into stages based on a) actions that pull data back to the driver (like collect()) and b) RDDs that require shuffles (e.g. join() where the two inputs aren't already partitioned identically). All the steps within each stage are executed together so that an intermediate RDD from the middle of a stage never has to be fully materialized in memory, only the input data to that stage and the output data. So, for example, if you have something like this:

originalRDD.map(...).filter(....).join(otherInputRDD)

then the map and filter will be performed at the same time -- each input element is read, passed through the map function, then evaluated for the filter function and either passed on or dropped.

When data is shuffled it is *always* written to disk. If you have enough memory for your OS to keep that in memory, then the data may not actually hit disk (Spark doesn't call fsync). That's why this page http://spark.incubator.apache.org/docs/latest/hardware-provisioning.html suggests:

"In all cases, we recommend allocating only at most 75% of the memory for Spark; leave the rest for the operating system and buffer cache."

-Ewen

David Thomas wrote:
Thanks.

Another basic question:

Lets say derivedRDD is much larger than originalRDD and it doesn't fit into memory. Will Spark take care of automatically spilling it to disk? or will I face JavaHeap out of memory?





On Wed, Feb 19, 2014 at 11:05 PM, Ewen Cheslack-Postava <[hidden email]> wrote:

Only originalRDD is cached. You need to call cache/persist for every RDD you want cached.

February 19, 2014 at 10:03 PM
When I persist/cache an RDD, are all the derived RDDs cached as well or do I need to  call cache individually on each RDD if I need them to be cached?

For ex:

val originalRDD = sc.parallelize(...)
originalRDD.cache
val derivedRDD = originalRDD.map()

Is derivedRDD cached in this case?