How to create RDDs from another RDD?

4 messages
Open this post in threaded view
|

How to create RDDs from another RDD?

 The RDD API has  functions to join multiple RDDs, such as PariRDD.join or PariRDD.cogroup that take another RDD as input. e.g.  firstRDD.join(secondRDD)I'm looking for ways to do the opposite: split an existing RDD. What is the right way to create derivate RDDs from an existing RDD?  e.g. imagine I've an  collection or pairs as input: colRDD =  (k1->v1)...(kx->vy)...I could do:val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy, ...)) Now, I'd like to create an RDD from the values to have something like:val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...)) in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the right/recommended way to do this?  Any other options? Thanks,Gerard.
Open this post in threaded view
|

Re: How to create RDDs from another RDD?

 Hi Gerard,Usually when I want to split one RDD into several, I'm better off re-thinking the algorithm to do all the computation at once.  Example:Suppose you had a dataset that was the tuple (URL, webserver, pageSizeBytes), and you wanted to find out the average page size that each webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD, IISRDD, it's probably better to do the average computation over all at once, like this: // allPagesRDD is (URL, webserver, pageSizeBytes)allPagesRDD.keyBy(getWebserver)  .map(k => (k.pageSizeBytes, 1))   .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)  .mapValues( v => (v._1 / v._2) )For this example you could use something like Summingbird to keep from doing the average tracking yourself. Can you go into more detail about why you want to split one RDD into several?On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas wrote: The RDD API has  functions to join multiple RDDs, such as PariRDD.join or PariRDD.cogroup that take another RDD as input. e.g.  firstRDD.join(secondRDD) I'm looking for ways to do the opposite: split an existing RDD. What is the right way to create derivate RDDs from an existing RDD?  e.g. imagine I've an  collection or pairs as input: colRDD =  (k1->v1)...(kx->vy)...I could do:val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy, ...)) Now, I'd like to create an RDD from the values to have something like:val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...)) in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the right/recommended way to do this?  Any other options? Thanks,Gerard.