RDD and Partition

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

RDD and Partition

David Thomas
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.

Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

MLnick
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.


Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Mark Hamstra
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.



Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

David Thomas
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?


On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.




Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Mark Hamstra
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.





Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Mark Hamstra
Doesn't avoid an 'if' on every partition, but does avoid it on every element of every partition.


On Tue, Jan 28, 2014 at 1:29 PM, Mark Hamstra <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.






Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Christopher Nguyen
In reply to this post by Mark Hamstra

David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.





Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Mark Hamstra
SparkContext#runJob is the basis of an RDD action, so the result of using runJob to call toUpperCase on the A-to-M partitions will be the uppercased strings materialized in the driver process, not a transformation of the original RDD.


On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <[hidden email]> wrote:

David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.






Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Mark Hamstra
In reply to this post by Mark Hamstra
Of course, there's an excess dot in `rdd26..mapPartitionsWithIndex{}`.


On Tue, Jan 28, 2014 at 1:29 PM, Mark Hamstra <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.






Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Christopher Nguyen
In reply to this post by Mark Hamstra

Hence the qualification to determine whether it is necessary *and* sufficient, depending on what David is trying to do overall :)

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 2:10 PM, "Mark Hamstra" <[hidden email]> wrote:
SparkContext#runJob is the basis of an RDD action, so the result of using runJob to call toUpperCase on the A-to-M partitions will be the uppercased strings materialized in the driver process, not a transformation of the original RDD.


On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <[hidden email]> wrote:

David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.






Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

David Thomas
Thanks for those tips.

I was looking into the docs for PartitionPruningRDD. It says, "A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on all partitions". I did not understand this exactly and I couldn't find any sample code. Can we use this to apply a function only on certain partitions?


On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <[hidden email]> wrote:

Hence the qualification to determine whether it is necessary *and* sufficient, depending on what David is trying to do overall :)

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 2:10 PM, "Mark Hamstra" <[hidden email]> wrote:
SparkContext#runJob is the basis of an RDD action, so the result of using runJob to call toUpperCase on the A-to-M partitions will be the uppercased strings materialized in the driver process, not a transformation of the original RDD.


On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <[hidden email]> wrote:

David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.







Reply | Threaded
Open this post in threaded view
|

Re: RDD and Partition

Christopher Nguyen

David, a PPRDD (mobile abbrev) is a new RDD that contains a subset of partitions of the original RDD. Subsequent transformations/operations will only see this subset. So yes, it may do what you need, or not, depending on whether you still need to do something with the other partitions, as implied by Mark. You can of course still refer to the original RDD, or create yet another PPRDD containing that other subset, etc., just as you can call sc.runJob() on different partitions each time.

I'm sure you can decide which pattern best fits your use case. Beware of over optimizing leading to unnecessary complexity, though I've also learned not to underestimate others' real needs based on their toy examples.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 3:41 PM, "David Thomas" <[hidden email]> wrote:
Thanks for those tips.

I was looking into the docs for PartitionPruningRDD. It says, "A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on all partitions". I did not understand this exactly and I couldn't find any sample code. Can we use this to apply a function only on certain partitions?


On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <[hidden email]> wrote:

Hence the qualification to determine whether it is necessary *and* sufficient, depending on what David is trying to do overall :)

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 2:10 PM, "Mark Hamstra" <[hidden email]> wrote:
SparkContext#runJob is the basis of an RDD action, so the result of using runJob to call toUpperCase on the A-to-M partitions will be the uppercased strings materialized in the driver process, not a transformation of the original RDD.


On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <[hidden email]> wrote:

David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 1:30 PM, "Mark Hamstra" <[hidden email]> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <[hidden email]> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <[hidden email]> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <[hidden email]> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <[hidden email]> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.