[Spark R]: dapply only works for very small datasets

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark R]: dapply only works for very small datasets

Kunft, Andreas

Hello,


I tried to execute some user defined functions with R using the airline arrival performance dataset.

While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs.

Best
Andreas


Reply | Threaded
Open this post in threaded view
|

Re: [Spark R]: dapply only works for very small datasets

Felix Cheung
What’s the number of executor and/or number of partitions you are working with?

I’m afraid most of the problem is with the serialization deserialization overhead between JVM and R...


From: Kunft, Andreas <[hidden email]>
Sent: Monday, November 27, 2017 10:27:33 AM
To: [hidden email]
Subject: [Spark R]: dapply only works for very small datasets
 

Hello,


I tried to execute some user defined functions with R using the airline arrival performance dataset.

While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs.

Best
Andreas


Reply | Threaded
Open this post in threaded view
|

AW: [Spark R]: dapply only works for very small datasets

Kunft, Andreas

Thanks for the fast reply.


I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well as on 4 nodes with the same specifications.

When I shrink the data to around 100MB, 

it runs in about 1 hour for 1 core and about 6 min with 8 cores.


I'm aware that the serDe takes time, but it seems there must be something else off considering these numbers.



Von: Felix Cheung <[hidden email]>
Gesendet: Montag, 27. November 2017 20:20
An: Kunft, Andreas; [hidden email]
Betreff: Re: [Spark R]: dapply only works for very small datasets
 
What’s the number of executor and/or number of partitions you are working with?

I’m afraid most of the problem is with the serialization deserialization overhead between JVM and R...


From: Kunft, Andreas <[hidden email]>
Sent: Monday, November 27, 2017 10:27:33 AM
To: [hidden email]
Subject: [Spark R]: dapply only works for very small datasets
 

Hello,


I tried to execute some user defined functions with R using the airline arrival performance dataset.

While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs.

Best
Andreas


Reply | Threaded
Open this post in threaded view
|

Re: [Spark R]: dapply only works for very small datasets

Felix Cheung
You can find more discussions in
https://issues.apache.org/jira/browse/SPARK-18924
And
https://issues.apache.org/jira/browse/SPARK-17634

I suspect the cost is linear - so partitioning the data into smaller chunks with more executors (one core each) running in parallel would probably help a bit.

Unfortunately this is an area that we really would use some improvements on, and I think it *should* be possible (hmm  https://databricks.com/blog/2017/10/06/accelerating-r-workflows-on-databricks.html. ;)

_____________________________
From: Kunft, Andreas <[hidden email]>
Sent: Tuesday, November 28, 2017 3:11 AM
Subject: AW: [Spark R]: dapply only works for very small datasets
To: Felix Cheung <[hidden email]>, <[hidden email]>


Thanks for the fast reply.


I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well as on 4 nodes with the same specifications.

When I shrink the data to around 100MB, 

it runs in about 1 hour for 1 core and about 6 min with 8 cores.


I'm aware that the serDe takes time, but it seems there must be something else off considering these numbers.



Von: Felix Cheung <[hidden email]>
Gesendet: Montag, 27. November 2017 20:20
An: Kunft, Andreas; [hidden email]
Betreff: Re: [Spark R]: dapply only works for very small datasets
 
What’s the number of executor and/or number of partitions you are working with?

I’m afraid most of the problem is with the serialization deserialization overhead between JVM and R...


From: Kunft, Andreas <[hidden email]>
Sent: Monday, November 27, 2017 10:27:33 AM
To: [hidden email]
Subject: [Spark R]: dapply only works for very small datasets
 

Hello,


I tried to execute some user defined functions with R using the airline arrival performance dataset.

While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs.

Best
Andreas




Reply | Threaded
Open this post in threaded view
|

AW: [Spark R]: dapply only works for very small datasets

Kunft, Andreas

​Thanks alot.

I will have a lock at the issues


Von: Felix Cheung <[hidden email]>
Gesendet: Mittwoch, 29. November 2017 04:47
An: Kunft, Andreas; [hidden email]
Betreff: Re: [Spark R]: dapply only works for very small datasets
 
You can find more discussions in
https://issues.apache.org/jira/browse/SPARK-18924
And
https://issues.apache.org/jira/browse/SPARK-17634

I suspect the cost is linear - so partitioning the data into smaller chunks with more executors (one core each) running in parallel would probably help a bit.

Unfortunately this is an area that we really would use some improvements on, and I think it *should* be possible (hmm  https://databricks.com/blog/2017/10/06/accelerating-r-workflows-on-databricks.html. ;)

_____________________________
From: Kunft, Andreas <[hidden email]>
Sent: Tuesday, November 28, 2017 3:11 AM
Subject: AW: [Spark R]: dapply only works for very small datasets
To: Felix Cheung <[hidden email]>, <[hidden email]>


Thanks for the fast reply.


I tried it locally, with 1 - 8 slots on a 8 core machine w/ 25GB memory as well as on 4 nodes with the same specifications.

When I shrink the data to around 100MB, 

it runs in about 1 hour for 1 core and about 6 min with 8 cores.


I'm aware that the serDe takes time, but it seems there must be something else off considering these numbers.



Von: Felix Cheung <[hidden email]>
Gesendet: Montag, 27. November 2017 20:20
An: Kunft, Andreas; [hidden email]
Betreff: Re: [Spark R]: dapply only works for very small datasets
 
What’s the number of executor and/or number of partitions you are working with?

I’m afraid most of the problem is with the serialization deserialization overhead between JVM and R...


From: Kunft, Andreas <[hidden email]>
Sent: Monday, November 27, 2017 10:27:33 AM
To: [hidden email]
Subject: [Spark R]: dapply only works for very small datasets
 

Hello,


I tried to execute some user defined functions with R using the airline arrival performance dataset.

While the examples from the documentation for the `<-` apply operator work perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several DOPs.

Best
Andreas