How to create RDDs from another RDD?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to create RDDs from another RDD?

maasg
The RDD API has  functions to join multiple RDDs, such as PariRDD.join or PariRDD.cogroup that take another RDD as input. e.g.  firstRDD.join(secondRDD)

I'm looking for ways to do the opposite: split an existing RDD. What is the right way to create derivate RDDs from an existing RDD? 

e.g. imagine I've an  collection or pairs as input: colRDD =  (k1->v1)...(kx->vy)...
I could do:
val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy, ...))

Now, I'd like to create an RDD from the values to have something like:

val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))

in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?

Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the right/recommended way to do this?  Any other options?

Thanks,

Gerard.
Reply | Threaded
Open this post in threaded view
|

Re: How to create RDDs from another RDD?

Andrew Ash
Hi Gerard,

Usually when I want to split one RDD into several, I'm better off re-thinking the algorithm to do all the computation at once.  Example:

Suppose you had a dataset that was the tuple (URL, webserver, pageSizeBytes), and you wanted to find out the average page size that each webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD, IISRDD, it's probably better to do the average computation over all at once, like this:

// allPagesRDD is (URL, webserver, pageSizeBytes)
allPagesRDD.keyBy(getWebserver)
  .map(k => (k.pageSizeBytes, 1))
  .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
  .mapValues( v => (v._1 / v._2) )

For this example you could use something like Summingbird to keep from doing the average tracking yourself.

Can you go into more detail about why you want to split one RDD into several?


On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas <[hidden email]> wrote:
The RDD API has  functions to join multiple RDDs, such as PariRDD.join or PariRDD.cogroup that take another RDD as input. e.g.  firstRDD.join(secondRDD)

I'm looking for ways to do the opposite: split an existing RDD. What is the right way to create derivate RDDs from an existing RDD? 

e.g. imagine I've an  collection or pairs as input: colRDD =  (k1->v1)...(kx->vy)...
I could do:
val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy, ...))

Now, I'd like to create an RDD from the values to have something like:

val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))

in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?

Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the right/recommended way to do this?  Any other options?

Thanks,

Gerard.

Reply | Threaded
Open this post in threaded view
|

Re: How to create RDDs from another RDD?

maasg
Hi Andrew,

Thanks for your answer.

The reason of the question: I've been trying to contribute to the community by helping answering Spark-related questions on Stack Overflow.

(note on that: Given the growing volume on the user list lately, I think it will need to scale out to other venues, so helping at SO will further contribute to the mainstream road of Spark)

I came across this question [1] on how to save parts of an RDD to different HDFS files. I looked into the impl of saveAsText. The delegation path terminates on  PairRDD.saveAsHadoopDataset and looks like the impl is quite tight to the RDD data, so the potential easiest way is solve the problem at hand is to create several RDDs from the original RDD.

The issue I see is that the  'sc.makeRDD(v.toSeq)' will potentially blow when trying to materialize the iterator into a seq.  I also don't know what the behaviour of that call to SparkContext will be on a remote worker.

My current conclusion is that the best option would be to roll an own saveHdfsFile(...)

Would you agree?

-greetz, Gerard.



 


On Mon, Jun 2, 2014 at 11:44 PM, Andrew Ash <[hidden email]> wrote:
Hi Gerard,

Usually when I want to split one RDD into several, I'm better off re-thinking the algorithm to do all the computation at once.  Example:

Suppose you had a dataset that was the tuple (URL, webserver, pageSizeBytes), and you wanted to find out the average page size that each webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD, IISRDD, it's probably better to do the average computation over all at once, like this:

// allPagesRDD is (URL, webserver, pageSizeBytes)
allPagesRDD.keyBy(getWebserver)
  .map(k => (k.pageSizeBytes, 1))
  .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
  .mapValues( v => (v._1 / v._2) )

For this example you could use something like Summingbird to keep from doing the average tracking yourself.

Can you go into more detail about why you want to split one RDD into several?


On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas <[hidden email]> wrote:
The RDD API has  functions to join multiple RDDs, such as PariRDD.join or PariRDD.cogroup that take another RDD as input. e.g.  firstRDD.join(secondRDD)

I'm looking for ways to do the opposite: split an existing RDD. What is the right way to create derivate RDDs from an existing RDD? 

e.g. imagine I've an  collection or pairs as input: colRDD =  (k1->v1)...(kx->vy)...
I could do:
val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy, ...))

Now, I'd like to create an RDD from the values to have something like:

val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))

in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?

Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the right/recommended way to do this?  Any other options?

Thanks,

Gerard.


Reply | Threaded
Open this post in threaded view
|

Re: How to create RDDs from another RDD?

Andrew Ash
Hmm that sounds like it could be done in a custom OutputFormat, but I'm not familiar enough with custom OutputFormats to say that's the right thing to do.


On Tue, Jun 3, 2014 at 10:23 AM, Gerard Maas <[hidden email]> wrote:
Hi Andrew,

Thanks for your answer.

The reason of the question: I've been trying to contribute to the community by helping answering Spark-related questions on Stack Overflow.

(note on that: Given the growing volume on the user list lately, I think it will need to scale out to other venues, so helping at SO will further contribute to the mainstream road of Spark)

I came across this question [1] on how to save parts of an RDD to different HDFS files. I looked into the impl of saveAsText. The delegation path terminates on  PairRDD.saveAsHadoopDataset and looks like the impl is quite tight to the RDD data, so the potential easiest way is solve the problem at hand is to create several RDDs from the original RDD.

The issue I see is that the  'sc.makeRDD(v.toSeq)' will potentially blow when trying to materialize the iterator into a seq.  I also don't know what the behaviour of that call to SparkContext will be on a remote worker.

My current conclusion is that the best option would be to roll an own saveHdfsFile(...)

Would you agree?

-greetz, Gerard.



 


On Mon, Jun 2, 2014 at 11:44 PM, Andrew Ash <[hidden email]> wrote:
Hi Gerard,

Usually when I want to split one RDD into several, I'm better off re-thinking the algorithm to do all the computation at once.  Example:

Suppose you had a dataset that was the tuple (URL, webserver, pageSizeBytes), and you wanted to find out the average page size that each webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD, IISRDD, it's probably better to do the average computation over all at once, like this:

// allPagesRDD is (URL, webserver, pageSizeBytes)
allPagesRDD.keyBy(getWebserver)
  .map(k => (k.pageSizeBytes, 1))
  .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
  .mapValues( v => (v._1 / v._2) )

For this example you could use something like Summingbird to keep from doing the average tracking yourself.

Can you go into more detail about why you want to split one RDD into several?


On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas <[hidden email]> wrote:
The RDD API has  functions to join multiple RDDs, such as PariRDD.join or PariRDD.cogroup that take another RDD as input. e.g.  firstRDD.join(secondRDD)

I'm looking for ways to do the opposite: split an existing RDD. What is the right way to create derivate RDDs from an existing RDD? 

e.g. imagine I've an  collection or pairs as input: colRDD =  (k1->v1)...(kx->vy)...
I could do:
val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy, ...))

Now, I'd like to create an RDD from the values to have something like:

val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))

in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?

Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the right/recommended way to do this?  Any other options?

Thanks,

Gerard.