Quantcast

Partitions - Distribute By - MapPartitions

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Partitions - Distribute By - MapPartitions

Balaji Krishnan
This post has NOT been accepted by the mailing list yet.
Hello

We have a source that has approximately 45MM rows. Each row has a KEY column and in total we have 25000 distinct keys. I would like to process all of them in parallel and hence used the following Spark SQL to get all distinct partitions

hiveContext.sql("select * from BigTbl DISTRIBUTE by KEY")

Subsequently using a mapPartitions we process each and every row in all the partitions. I have a variable which should be initiated once per partition. Lets call that variable name as designName.
This variable is initialized with the KEY value (partition value) for future identification purposes. We do other actions inside MapPartitions which i am not covering here.

Since i have more than 200 partitions (which is the default) i had to change the same. There are two options that i was adviced to use.

1. long distinctValues = hiveContext.sql("select KEY from BigTbl").distinct().count();
JavaRDD<Row> partitionedRecs = hiveContext.sql("select * from BigTbl DISTRIBUTE by SKU ").repartition((int)distinctValues).toJavaRDD().cache();

2. new JavaSparkContext(new SparkConf().set("spark.driver.maxResultSize","0").set("spark.sql.shuffle.partitions","26000"))

In both the options the designName does not get the right value across all partitions. To troubleshoot and show this on a smaller scale i did the following
Identified 3 KEY values and did the test on them.
This is from the Source table
Source select stmt
Source Results

For the above 3 keys, i am expecting those counts in individual partitions.  The spark code is as follows

import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
hiveContext.sql("set spark.sql.shuffle.partitions=3")
val mydata = hiveContext.sql("select * from BigTbl where KEY in (29890050079, 29890050083, 29890050094) DISTRIBUTE by SKU ");
val sz = mydata.rdd.mapPartitions(iter => Array(iter.size).iterator,true)
sz.take(3)
take3

Please note that i get 3 partition output, but one is ZERO, two partitions are clubbed together while one count matches.

If the shuffle partitions is changed to another bigger arbitrary number, say to 8
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
hiveContext.sql("set spark.sql.shuffle.partitions=8")
val mydata = hiveContext.sql("select * from BigTbl where KEY in (29890050079, 29890050083, 29890050094) DISTRIBUTE by SKU ");
val sz = mydata.rdd.mapPartitions(iter => Array(iter.size).iterator,true)
sz.take(8)
take-8

in this case the count matches with the unique partitions and in this case it is not clubbing two partitions together.

My question is, how can we change and set the number of partitions to a number such that we are able to process all partitions uniquely.

Our Spark environment is the following
Apache Spark - 1.6.1
Hadoop           - 2.6.2
Hive                - 1.2.1

Thanks very much for your help.

Regards

Bala
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitions - Distribute By - MapPartitions

Balaji Krishnan
This post has NOT been accepted by the mailing list yet.
Anyone has any suggestions on how this can be addressed please.

Regards

Balaji Krishnan
Loading...