RepartitionByKey Behavior

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

RepartitionByKey Behavior

Chawla,Sumit
Hi 

 I have been trying to this simple operation.  I want to land all values with one key in same partition, and not have any different key in the same partition.  Is this possible?   I am getting b and c always getting mixed up in the same partition. 


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9), ('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        val = x[0]
        key = portable_hash(x[0])
        print ("Val %s Assigned Key %s" % (val, key))
        return key
    return partitioner_

def validate(part):
    last_key = None
    for p in part:
        k = p[0]
        if not last_key:
            last_key = k
        if k != last_key:
            print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1])) 
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla

Reply | Threaded
Open this post in threaded view
|

Re: RepartitionByKey Behavior

Chawla,Sumit
Based on code read it looks like Spark does modulo of key for partition.  Keys of c and b end up pointing to same value.  Whats the best partitioning scheme to deal with this?

Regards
Sumit Chawla


On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit <[hidden email]> wrote:
Hi 

 I have been trying to this simple operation.  I want to land all values with one key in same partition, and not have any different key in the same partition.  Is this possible?   I am getting b and c always getting mixed up in the same partition. 


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9), ('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        val = x[0]
        key = portable_hash(x[0])
        print ("Val %s Assigned Key %s" % (val, key))
        return key
    return partitioner_

def validate(part):
    last_key = None
    for p in part:
        k = p[0]
        if not last_key:
            last_key = k
        if k != last_key:
            print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1])) 
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla


Reply | Threaded
Open this post in threaded view
|

Re: RepartitionByKey Behavior

Jungtaek Lim
It is not possible because the cardinality of the partitioning key is non-deterministic, while partition count should be fixed. There's a chance that cardinality > partition count and then the system can't ensure the requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit <[hidden email]>님이 작성:
Based on code read it looks like Spark does modulo of key for partition.  Keys of c and b end up pointing to same value.  Whats the best partitioning scheme to deal with this?

Regards

Sumit Chawla


On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit <[hidden email]> wrote:
Hi 

 I have been trying to this simple operation.  I want to land all values with one key in same partition, and not have any different key in the same partition.  Is this possible?   I am getting b and c always getting mixed up in the same partition. 


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9), ('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        val = x[0]
        key = portable_hash(x[0])
        print ("Val %s Assigned Key %s" % (val, key))
        return key
    return partitioner_

def validate(part):
    last_key = None
    for p in part:
        k = p[0]
        if not last_key:
            last_key = k
        if k != last_key:
            print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1])) 
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla


Reply | Threaded
Open this post in threaded view
|

Re: RepartitionByKey Behavior

Elior Malul
Hi Chawla,
There is nothing wrong with your code, nor with Spark.

The situation in which two different keys are mapped to the same partition is perfectly valid,
since they are mapped to the same 'bucket'.

The promise is that all records with the same key 'k' will be mapped to the same partition.

On Fri, Jun 22, 2018 at 3:07 AM, Jungtaek Lim <[hidden email]> wrote:
It is not possible because the cardinality of the partitioning key is non-deterministic, while partition count should be fixed. There's a chance that cardinality > partition count and then the system can't ensure the requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit <[hidden email]>님이 작성:
Based on code read it looks like Spark does modulo of key for partition.  Keys of c and b end up pointing to same value.  Whats the best partitioning scheme to deal with this?

Regards

Sumit Chawla


On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit <[hidden email]> wrote:
Hi 

 I have been trying to this simple operation.  I want to land all values with one key in same partition, and not have any different key in the same partition.  Is this possible?   I am getting b and c always getting mixed up in the same partition. 


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9), ('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        val = x[0]
        key = portable_hash(x[0])
        print ("Val %s Assigned Key %s" % (val, key))
        return key
    return partitioner_

def validate(part):
    last_key = None
    for p in part:
        k = p[0]
        if not last_key:
            last_key = k
        if k != last_key:
            print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1])) 
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla



Reply | Threaded
Open this post in threaded view
|

Re: RepartitionByKey Behavior

Nathan Kronenfeld-2
On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit <[hidden email]> wrote:
Hi 

 I have been trying to this simple operation.  I want to land all values with one key in same partition, and not have any different key in the same partition.  Is this possible?   I am getting b and c always getting mixed up in the same partition. 



I think you could do something approsimately like:

     val keys = rdd.map(_.getKey).distinct.zipWithIndex
     val numKey = keys.map(_._2).count
     rdd.map(r => (r.getKey, r)).join(keys).partitionBy(new Partitioner() {def numPartitions=numKeys;def getPartition(key: Any) = key.asInstanceOf[Long].toInt})

i.e., key by a unique number, count that, and repartition by key to the exact count.  This presumes, of course, that the number of keys is <MAXINT.

Also, I haven't tested this code, so don't take it as anything more than an approximate idea, please :-)

                     -Nathan Kronenfeld
Reply | Threaded
Open this post in threaded view
|

Re: RepartitionByKey Behavior

Chawla,Sumit
Thanks everyone.  As Nathan suggested,  I ended up collecting the distinct keys first and then assigning Ids to each key explicitly.  

Regards
Sumit Chawla


On Fri, Jun 22, 2018 at 7:29 AM, Nathan Kronenfeld <[hidden email]> wrote:
On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit <[hidden email]> wrote:
Hi 

 I have been trying to this simple operation.  I want to land all values with one key in same partition, and not have any different key in the same partition.  Is this possible?   I am getting b and c always getting mixed up in the same partition. 



I think you could do something approsimately like:

     val keys = rdd.map(_.getKey).distinct.zipWithIndex
     val numKey = keys.map(_._2).count
     rdd.map(r => (r.getKey, r)).join(keys).partitionBy(new Partitioner() {def numPartitions=numKeys;def getPartition(key: Any) = key.asInstanceOf[Long].toInt})

i.e., key by a unique number, count that, and repartition by key to the exact count.  This presumes, of course, that the number of keys is <MAXINT.

Also, I haven't tested this code, so don't take it as anything more than an approximate idea, please :-)

                     -Nathan Kronenfeld