Issue with sortByKey.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with sortByKey.

Archit Thakur
Hi,

I have 6 sequence files as input to spark code.
What I am doing is:
1. Create 6 individual RDD's out of them.
2. Union them.
3. Then Some Mapping.
4. Count no of ele in RDD.
5. Then SortByKey.

Now, If I see logging:

14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)

This is count step (4th)

Doubt 1: Why 6 output partitions?

It then prints progress for each of them

14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager guavus-000392:52345 with 47.4 GB RAM
14/01/03 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392 (progress: 1/6)
14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed ResultTask(0, 5)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
14/01/03 09:04:10 INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392 (progress: 4/6)
14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6)
14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)
14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s
14/01/03 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count


After that when it goes to sortByKey:

14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)

However, It should have been n output partitions, where n = unique no. of keys in RDD. Isn't it?

Thanks and Regards,
Archit Thakur.
Reply | Threaded
Open this post in threaded view
|

Re: Issue with sortByKey.

Archit Thakur
I saw Code of sortByKey:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {

It makes numPartitions = self.partitions.size which comes from getPartitions method of RDD, if you dont specify it explicitly.

In this case it will be rdd which will be created by step (3rd). Isn't it wrong?


On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <[hidden email]> wrote:
Hi,

I have 6 sequence files as input to spark code.
What I am doing is:
1. Create 6 individual RDD's out of them.
2. Union them.
3. Then Some Mapping.
4. Count no of ele in RDD.
5. Then SortByKey.

Now, If I see logging:

14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)

This is count step (4th)

Doubt 1: Why 6 output partitions?

It then prints progress for each of them

14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager guavus-000392:52345 with 47.4 GB RAM
14/01/03 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392 (progress: 1/6)
14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed ResultTask(0, 5)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
14/01/03 09:04:10 INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392 (progress: 4/6)
14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6)
14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)
14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s
14/01/03 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count


After that when it goes to sortByKey:

14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)

However, It should have been n output partitions, where n = unique no. of keys in RDD. Isn't it?

Thanks and Regards,
Archit Thakur.

Reply | Threaded
Open this post in threaded view
|

Re: Issue with sortByKey.

Andrew Ash
Hi Archit,

A partition is a chunk of data about the size of an HDFS block, not that of a single key.  Because every partition is tracked individually and each is processed in a task on one CPU core, having massive numbers of them causes slowdowns in the scheduler and elsewhere in the system.  About how much data are you looking at here?  If the source of your RDDs are in HDFS, then how many HDFS blocks are required to hold the 6 RDDs?

Andrew


On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur <[hidden email]> wrote:
I saw Code of sortByKey:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {

It makes numPartitions = self.partitions.size which comes from getPartitions method of RDD, if you dont specify it explicitly.

In this case it will be rdd which will be created by step (3rd). Isn't it wrong?


On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <[hidden email]> wrote:
Hi,

I have 6 sequence files as input to spark code.
What I am doing is:
1. Create 6 individual RDD's out of them.
2. Union them.
3. Then Some Mapping.
4. Count no of ele in RDD.
5. Then SortByKey.

Now, If I see logging:

14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)

This is count step (4th)

Doubt 1: Why 6 output partitions?

It then prints progress for each of them

14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager guavus-000392:52345 with 47.4 GB RAM
14/01/03 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392 (progress: 1/6)
14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed ResultTask(0, 5)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
14/01/03 09:04:10 INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392 (progress: 4/6)
14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6)
14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)
14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s
14/01/03 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count


After that when it goes to sortByKey:

14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)

However, It should have been n output partitions, where n = unique no. of keys in RDD. Isn't it?

Thanks and Regards,
Archit Thakur.


Reply | Threaded
Open this post in threaded view
|

Re: Issue with sortByKey.

Archit Thakur
I realized my mistake as soon as I posted it. I actually meant groupByKey not sortedByKey. And Yeah you are right, it is consuming 6 Hdfs blocks.

The issue I am facing is When I do a groupBy, it reduces the no. of unique keys in the Rdd and modify them also.

For eg:

I have a custom DS.

Below is the set of unique keys in the baseRdd

(40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
(40^0^0[2^1380^0]6[2[18^71.68.211.98:62510][2^WP]]
(40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
(40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
(40^0^0[2^1380^1383838476]6[2[18^71.68.211.98:62498][2^WP]]
(40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
(40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
(40^0^0[2^1380^1383839119]6[2[19^128.211.178.8:33448][2^WP]]
(40^0^0[2^1380^1383839294]6[2[19^71.75.156.224:36652][2^WP]]
(40^0^0[2^1380^1383839651]6[2[18^69.133.71.57:58320][2^WP]]
(43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]


and when I do a groupBy on the Rdd, it gives me:

(40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
(40^0^0[2^1380^0]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
(40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
(40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
(40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
(43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]


Not only it has reduced the no. of keys but also have modified it.

groupBy operation only uses equals method of the Key class (to check the equality of the key), right?


On Fri, Jan 3, 2014 at 4:02 PM, Andrew Ash <[hidden email]> wrote:
Hi Archit,

A partition is a chunk of data about the size of an HDFS block, not that of a single key.  Because every partition is tracked individually and each is processed in a task on one CPU core, having massive numbers of them causes slowdowns in the scheduler and elsewhere in the system.  About how much data are you looking at here?  If the source of your RDDs are in HDFS, then how many HDFS blocks are required to hold the 6 RDDs?

Andrew


On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur <[hidden email]> wrote:
I saw Code of sortByKey:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {

It makes numPartitions = self.partitions.size which comes from getPartitions method of RDD, if you dont specify it explicitly.

In this case it will be rdd which will be created by step (3rd). Isn't it wrong?


On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <[hidden email]> wrote:
Hi,

I have 6 sequence files as input to spark code.
What I am doing is:
1. Create 6 individual RDD's out of them.
2. Union them.
3. Then Some Mapping.
4. Count no of ele in RDD.
5. Then SortByKey.

Now, If I see logging:

14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)

This is count step (4th)

Doubt 1: Why 6 output partitions?

It then prints progress for each of them

14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager guavus-000392:52345 with 47.4 GB RAM
14/01/03 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392 (progress: 1/6)
14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed ResultTask(0, 5)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
14/01/03 09:04:10 INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392 (progress: 4/6)
14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6)
14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)
14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s
14/01/03 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count


After that when it goes to sortByKey:

14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)

However, It should have been n output partitions, where n = unique no. of keys in RDD. Isn't it?

Thanks and Regards,
Archit Thakur.



Reply | Threaded
Open this post in threaded view
|

Re: Issue with sortByKey.

Andrew Ash

It probably uses hashcode too so make sure those two methods are in sync

Sent from my mobile phone

On Jan 3, 2014 3:26 AM, "Archit Thakur" <[hidden email]> wrote:
I realized my mistake as soon as I posted it. I actually meant groupByKey not sortedByKey. And Yeah you are right, it is consuming 6 Hdfs blocks.

The issue I am facing is When I do a groupBy, it reduces the no. of unique keys in the Rdd and modify them also.

For eg:

I have a custom DS.

Below is the set of unique keys in the baseRdd

(40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
(40^0^0[2^1380^0]6[2[18^71.68.211.98:62510][2^WP]]
(40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
(40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
(40^0^0[2^1380^1383838476]6[2[18^71.68.211.98:62498][2^WP]]
(40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
(40^0^0[2^1380^1383839099]6[2[19^<a href="tel:71.75.156.224" value="+17175156224" target="_blank">71.75.156.224:52842][2^WP]]
(40^0^0[2^1380^1383839119]6[2[19^128.211.178.8:33448][2^WP]]
(40^0^0[2^1380^1383839294]6[2[19^<a href="tel:71.75.156.224" value="+17175156224" target="_blank">71.75.156.224:36652][2^WP]]
(40^0^0[2^1380^1383839651]6[2[18^69.133.71.57:58320][2^WP]]
(43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]


and when I do a groupBy on the Rdd, it gives me:

(40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
(40^0^0[2^1380^0]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
(40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
(40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
(40^0^0[2^1380^1383839099]6[2[19^<a href="tel:71.75.156.224" value="+17175156224" target="_blank">71.75.156.224:52842][2^WP]]
(43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]


Not only it has reduced the no. of keys but also have modified it.

groupBy operation only uses equals method of the Key class (to check the equality of the key), right?


On Fri, Jan 3, 2014 at 4:02 PM, Andrew Ash <[hidden email]> wrote:
Hi Archit,

A partition is a chunk of data about the size of an HDFS block, not that of a single key.  Because every partition is tracked individually and each is processed in a task on one CPU core, having massive numbers of them causes slowdowns in the scheduler and elsewhere in the system.  About how much data are you looking at here?  If the source of your RDDs are in HDFS, then how many HDFS blocks are required to hold the 6 RDDs?

Andrew


On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur <[hidden email]> wrote:
I saw Code of sortByKey:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {

It makes numPartitions = self.partitions.size which comes from getPartitions method of RDD, if you dont specify it explicitly.

In this case it will be rdd which will be created by step (3rd). Isn't it wrong?


On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <[hidden email]> wrote:
Hi,

I have 6 sequence files as input to spark code.
What I am doing is:
1. Create 6 individual RDD's out of them.
2. Union them.
3. Then Some Mapping.
4. Count no of ele in RDD.
5. Then SortByKey.

Now, If I see logging:

14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)

This is count step (4th)

Doubt 1: Why 6 output partitions?

It then prints progress for each of them

14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager guavus-000392:52345 with 47.4 GB RAM
14/01/03 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392 (progress: 1/6)
14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed ResultTask(0, 5)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)
14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)
14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
14/01/03 09:04:10 INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392 (progress: 4/6)
14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6)
14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)
14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2)
14/01/03 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s
14/01/03 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count


After that when it goes to sortByKey:

14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)

However, It should have been n output partitions, where n = unique no. of keys in RDD. Isn't it?

Thanks and Regards,
Archit Thakur.