Questions about caching

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about caching

Andrew Melo
Greetings, Spark Aficionados-

I'm working on a project to (ab-)use PySpark to do particle physics
analysis, which involves iterating with a lot of transformations (to
apply weights and select candidate events) and reductions (to produce
histograms of relevant physics objects). We have a basic version
working, but I'm looking to exploit some of Spark's caching behavior
to speed up the interactive computation portion of the analysis,
probably by writing a thin convenience wrapper. I have a couple
questions I've been unable to find definitive answers to, which would
help me design this wrapper an efficient way:

1) When cache()-ing a dataframe where only a subset of the columns are
used, is the entire dataframe placed into the cache, or only the used
columns. E.G. does "df2" end up caching only "a", or all three
columns?

df1 = sc.read.load('test.parquet') # Has columns a, b, c
df2 = df1.cache()
df2.select('a').collect()

2) Are caches reference-based, or is there some sort of de-duplication
based on the logical/physical plans. So, for instance, does spark take
advantage of the fact that these two dataframes should have the same
content:

df1 = sc.read.load('test.parquet').cache()
df2 = sc.read.load('test.parquet').cache()

...or are df1 and df2 totally independent WRT caching behavior?

2a) If the cache is reference-based, is it sufficient to hold a
weakref to the python object to keep the cache in-scope?

3) Finally, the spark.externalBlockStore.blockManager is intriguing in
our environment where we have multiple users concurrently analyzing
mostly the same input datasets. We have enough RAM in our clusters to
cache a high percentage of the very common datasets, but only if users
could somehow share their caches (which, conveniently, are the larger
datasets), We also have very large edge SSD cache servers we use to
cache trans-oceanic I/O we could throw at this as well.

It looks, however, like that API was removed in 2.0.0 and there wasn't
a replacement. There are products like Alluxio, but they aren't
transparent, requiring the user to manually cache their dataframes by
doing save/loads to external files using "alluxio://" URIs. Is there
no way around this behavior now?

Sorry for the long email, and thanks!
Andrew

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about caching

Reza Safi
Hi Andrew,
1) df2 will cache all the columns
2) In spark2 you will receive a warning like:

WARN execution.CacheManager: Asked to cache already cached data.

I don't recall whether it is the same in 1.6. Seems you are not using spark 2.
2a) Not sure whether you are suggesting for a feature in Spark. Maybe someone else with more experience with pyspark can respond to you.
3) Have you considered using an external block store? Also I guess if this is an academic environment maybe there are much easier ways to handle this.

Best,
Reza.

On Tue, Dec 11, 2018 at 12:13 PM Andrew Melo <[hidden email]> wrote:
Greetings, Spark Aficionados-

I'm working on a project to (ab-)use PySpark to do particle physics
analysis, which involves iterating with a lot of transformations (to
apply weights and select candidate events) and reductions (to produce
histograms of relevant physics objects). We have a basic version
working, but I'm looking to exploit some of Spark's caching behavior
to speed up the interactive computation portion of the analysis,
probably by writing a thin convenience wrapper. I have a couple
questions I've been unable to find definitive answers to, which would
help me design this wrapper an efficient way:

1) When cache()-ing a dataframe where only a subset of the columns are
used, is the entire dataframe placed into the cache, or only the used
columns. E.G. does "df2" end up caching only "a", or all three
columns?

df1 = sc.read.load('test.parquet') # Has columns a, b, c
df2 = df1.cache()
df2.select('a').collect()

2) Are caches reference-based, or is there some sort of de-duplication
based on the logical/physical plans. So, for instance, does spark take
advantage of the fact that these two dataframes should have the same
content:

df1 = sc.read.load('test.parquet').cache()
df2 = sc.read.load('test.parquet').cache()

...or are df1 and df2 totally independent WRT caching behavior?

2a) If the cache is reference-based, is it sufficient to hold a
weakref to the python object to keep the cache in-scope?

3) Finally, the spark.externalBlockStore.blockManager is intriguing in
our environment where we have multiple users concurrently analyzing
mostly the same input datasets. We have enough RAM in our clusters to
cache a high percentage of the very common datasets, but only if users
could somehow share their caches (which, conveniently, are the larger
datasets), We also have very large edge SSD cache servers we use to
cache trans-oceanic I/O we could throw at this as well.

It looks, however, like that API was removed in 2.0.0 and there wasn't
a replacement. There are products like Alluxio, but they aren't
transparent, requiring the user to manually cache their dataframes by
doing save/loads to external files using "alluxio://" URIs. Is there
no way around this behavior now?

Sorry for the long email, and thanks!
Andrew

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about caching

Bin Fan
In reply to this post by Andrew Melo
Hi Andrew,

Since you mentioned the alternative solution with Alluxio, here is a more comprehensive 
tutorial on caching Spark dataframes on Alluxio:

Namely, caching your dataframe is simply running
df.write.parquet(alluxioFilePath)
and your dataframes are stored in Alluxio as parquet files and you can share them with more users.
One advantage with Alluxio here is you can manually free the cached data from memory tier or
set the TTL for the cached data if you'd like more control on the data.


- Bin

On Tue, Dec 11, 2018 at 9:13 AM Andrew Melo <[hidden email]> wrote:
Greetings, Spark Aficionados-

I'm working on a project to (ab-)use PySpark to do particle physics
analysis, which involves iterating with a lot of transformations (to
apply weights and select candidate events) and reductions (to produce
histograms of relevant physics objects). We have a basic version
working, but I'm looking to exploit some of Spark's caching behavior
to speed up the interactive computation portion of the analysis,
probably by writing a thin convenience wrapper. I have a couple
questions I've been unable to find definitive answers to, which would
help me design this wrapper an efficient way:

1) When cache()-ing a dataframe where only a subset of the columns are
used, is the entire dataframe placed into the cache, or only the used
columns. E.G. does "df2" end up caching only "a", or all three
columns?

df1 = sc.read.load('test.parquet') # Has columns a, b, c
df2 = df1.cache()
df2.select('a').collect()

2) Are caches reference-based, or is there some sort of de-duplication
based on the logical/physical plans. So, for instance, does spark take
advantage of the fact that these two dataframes should have the same
content:

df1 = sc.read.load('test.parquet').cache()
df2 = sc.read.load('test.parquet').cache()

...or are df1 and df2 totally independent WRT caching behavior?

2a) If the cache is reference-based, is it sufficient to hold a
weakref to the python object to keep the cache in-scope?

3) Finally, the spark.externalBlockStore.blockManager is intriguing in
our environment where we have multiple users concurrently analyzing
mostly the same input datasets. We have enough RAM in our clusters to
cache a high percentage of the very common datasets, but only if users
could somehow share their caches (which, conveniently, are the larger
datasets), We also have very large edge SSD cache servers we use to
cache trans-oceanic I/O we could throw at this as well.

It looks, however, like that API was removed in 2.0.0 and there wasn't
a replacement. There are products like Alluxio, but they aren't
transparent, requiring the user to manually cache their dataframes by
doing save/loads to external files using "alluxio://" URIs. Is there
no way around this behavior now?

Sorry for the long email, and thanks!
Andrew

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about caching

Gourav Sengupta
In reply to this post by Andrew Melo
Hi Andrew,
If you use Spark UI then all your questions are already answered there let me know if you need any help to browse the UI to look at the  contents that are cached.

Regards,
Gourav

On Tue, 11 Dec 2018, 17:13 Andrew Melo <[hidden email] wrote:
Greetings, Spark Aficionados-

I'm working on a project to (ab-)use PySpark to do particle physics
analysis, which involves iterating with a lot of transformations (to
apply weights and select candidate events) and reductions (to produce
histograms of relevant physics objects). We have a basic version
working, but I'm looking to exploit some of Spark's caching behavior
to speed up the interactive computation portion of the analysis,
probably by writing a thin convenience wrapper. I have a couple
questions I've been unable to find definitive answers to, which would
help me design this wrapper an efficient way:

1) When cache()-ing a dataframe where only a subset of the columns are
used, is the entire dataframe placed into the cache, or only the used
columns. E.G. does "df2" end up caching only "a", or all three
columns?

df1 = sc.read.load('test.parquet') # Has columns a, b, c
df2 = df1.cache()
df2.select('a').collect()

2) Are caches reference-based, or is there some sort of de-duplication
based on the logical/physical plans. So, for instance, does spark take
advantage of the fact that these two dataframes should have the same
content:

df1 = sc.read.load('test.parquet').cache()
df2 = sc.read.load('test.parquet').cache()

...or are df1 and df2 totally independent WRT caching behavior?

2a) If the cache is reference-based, is it sufficient to hold a
weakref to the python object to keep the cache in-scope?

3) Finally, the spark.externalBlockStore.blockManager is intriguing in
our environment where we have multiple users concurrently analyzing
mostly the same input datasets. We have enough RAM in our clusters to
cache a high percentage of the very common datasets, but only if users
could somehow share their caches (which, conveniently, are the larger
datasets), We also have very large edge SSD cache servers we use to
cache trans-oceanic I/O we could throw at this as well.

It looks, however, like that API was removed in 2.0.0 and there wasn't
a replacement. There are products like Alluxio, but they aren't
transparent, requiring the user to manually cache their dataframes by
doing save/loads to external files using "alluxio://" URIs. Is there
no way around this behavior now?

Sorry for the long email, and thanks!
Andrew

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]