How default partitioning in spark is deployed

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How default partitioning in spark is deployed

Renganathan Mutthiah
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

Mich Talebzadeh
Hi,

Well as it appears you have 5 entries in your data and 12 cores. The theory is that you run multiple tasks in parallel across multiple cores on a desktop which applies to your case. The statistics is not there to give a meaningful interpretation why Spark decided to put all data in one partition. If an RDD has too many partitions, then task scheduling may take more time than the actual execution time. In summary you just do not have enough statistics to draw a meaningful conclusion. 

Try to generate 100,000 rows and run your query and look at the pattern.

HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

Renganathan Mutthiah
Hi Mich,

Thanks for your precious time looking into my query. Yes, when we increase the number of objects, all partitions start having the data. I actually tried to understand what happens in my particular case.

Thanks!

On Tue, Mar 16, 2021 at 2:10 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

Well as it appears you have 5 entries in your data and 12 cores. The theory is that you run multiple tasks in parallel across multiple cores on a desktop which applies to your case. The statistics is not there to give a meaningful interpretation why Spark decided to put all data in one partition. If an RDD has too many partitions, then task scheduling may take more time than the actual execution time. In summary you just do not have enough statistics to draw a meaningful conclusion. 

Try to generate 100,000 rows and run your query and look at the pattern.

HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

Attila Zsolt Piros
In reply to this post by Renganathan Mutthiah
Hi!

This is weird. The code of foreachPartition leads to ParallelCollectionRDD which ends in slice, where the most important part is the positions method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
     }
 }

Because of the extra ' (' you used in "parallelize( (Array" I thought some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )
----------
----------
----------
Animal(1,Lion)
----------
----------
Animal(2,Elephant)
----------
----------
----------
Animal(3,Jaguar)
----------
----------
Animal(4,Tiger)
----------
----------
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the `foreachPartition` and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

Renganathan Mutthiah
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by different threads, they all printed the first line almost at the same time and followed by data which is also printed at almost the same time. This has given an appearance that all the data is stored in a single partition. When I run the below code, I can see that the objects are stored in different partitions of course!

myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e => println("Index : " +index +" " + e)); itr}, true).collect()

Prints the below... (index: ?  the ? is actually the partition number)
Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11 Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) 

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <[hidden email]> wrote:
Hi!

This is weird. The code of foreachPartition leads to ParallelCollectionRDD which ends in slice, where the most important part is the positions method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
     }
 }

Because of the extra ' (' you used in "parallelize( (Array" I thought some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )
----------
----------
----------
Animal(1,Lion)
----------
----------
Animal(2,Elephant)
----------
----------
----------
Animal(3,Jaguar)
----------
----------
Animal(4,Tiger)
----------
----------
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the `foreachPartition` and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

German Schiavon Matteo
Hi all,

I guess you could do something like this too: 

Captura de pantalla 2021-03-16 a las 14.35.46.png

On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <[hidden email]> wrote:
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by different threads, they all printed the first line almost at the same time and followed by data which is also printed at almost the same time. This has given an appearance that all the data is stored in a single partition. When I run the below code, I can see that the objects are stored in different partitions of course!

myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e => println("Index : " +index +" " + e)); itr}, true).collect()

Prints the below... (index: ?  the ? is actually the partition number)
Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11 Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) 

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <[hidden email]> wrote:
Hi!

This is weird. The code of foreachPartition leads to ParallelCollectionRDD which ends in slice, where the most important part is the positions method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
     }
 }

Because of the extra ' (' you used in "parallelize( (Array" I thought some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )
----------
----------
----------
Animal(1,Lion)
----------
----------
Animal(2,Elephant)
----------
----------
----------
Animal(3,Jaguar)
----------
----------
Animal(4,Tiger)
----------
----------
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the `foreachPartition` and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

Renganathan Mutthiah
That's a very good idea, thanks for sharing German!

On Tue, Mar 16, 2021 at 7:08 PM German Schiavon <[hidden email]> wrote:
Hi all,

I guess you could do something like this too: 

Captura de pantalla 2021-03-16 a las 14.35.46.png

On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <[hidden email]> wrote:
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by different threads, they all printed the first line almost at the same time and followed by data which is also printed at almost the same time. This has given an appearance that all the data is stored in a single partition. When I run the below code, I can see that the objects are stored in different partitions of course!

myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e => println("Index : " +index +" " + e)); itr}, true).collect()

Prints the below... (index: ?  the ? is actually the partition number)
Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11 Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) 

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <[hidden email]> wrote:
Hi!

This is weird. The code of foreachPartition leads to ParallelCollectionRDD which ends in slice, where the most important part is the positions method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
     }
 }

Because of the extra ' (' you used in "parallelize( (Array" I thought some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )
----------
----------
----------
Animal(1,Lion)
----------
----------
Animal(2,Elephant)
----------
----------
----------
Animal(3,Jaguar)
----------
----------
Animal(4,Tiger)
----------
----------
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the `foreachPartition` and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How default partitioning in spark is deployed

Attila Zsolt Piros
In reply to this post by German Schiavon Matteo
Oh, sure that was the reason. You can keep using the `foreachPartition` and get the partition ID from the `TaskContext`:

scala> import org.apache.spark.TaskContext
import org.apache.spark.TaskContext

scala> myRDD.foreachPartition( e => {  println(TaskContext.getPartitionId + ":" + e.mkString(",")) } )
0:
1:
2:Animal(1,Lion)
3:
4:Animal(2,Elephant)
5:
6:
7:Animal(3,Jaguar)
8:
9:Animal(4,Tiger)
10:
11:Animal(5,Chetah)

scala>

 


On Tue, Mar 16, 2021 at 2:38 PM German Schiavon <[hidden email]> wrote:
Hi all,

I guess you could do something like this too: 

Captura de pantalla 2021-03-16 a las 14.35.46.png

On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <[hidden email]> wrote:
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by different threads, they all printed the first line almost at the same time and followed by data which is also printed at almost the same time. This has given an appearance that all the data is stored in a single partition. When I run the below code, I can see that the objects are stored in different partitions of course!

myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e => println("Index : " +index +" " + e)); itr}, true).collect()

Prints the below... (index: ?  the ? is actually the partition number)
Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11 Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) 

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <[hidden email]> wrote:
Hi!

This is weird. The code of foreachPartition leads to ParallelCollectionRDD which ends in slice, where the most important part is the positions method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
     }
 }

Because of the extra ' (' you used in "parallelize( (Array" I thought some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )
----------
----------
----------
Animal(1,Lion)
----------
----------
Animal(2,Elephant)
----------
----------
----------
Animal(3,Jaguar)
----------
----------
Animal(4,Tiger)
----------
----------
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the `foreachPartition` and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <[hidden email]> wrote:
Hi,

I have a question with respect to default partitioning in RDD. 

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  


I am running the above piece of code in my laptop which has 12 logical cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is separate the partition contents): 
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!