GroupByKey implementation.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

GroupByKey implementation.

Archit Thakur

Below is the implementation for GroupByKey. (v, 0.8.0)

def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
    def createCombiner(v: V) = ArrayBuffer(v)
    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
    val bufs = combineByKey[ArrayBuffer[V]](
      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
    bufs.asInstanceOf[RDD[(K, Seq[V])]]

and CombineValuesByKey (Aggregator.scala):

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
    val combiners = new JHashMap[K, C]
    for (kv <- iter) {
      val oldC = combiners.get(kv._1)
      if (oldC == null) {
        combiners.put(kv._1, createCombiner(kv._2))
      } else {
        combiners.put(kv._1, mergeValue(oldC, kv._2))

My doubt is why null is being passed for mergeCombiners closure.

If two different partitions have same key, wouldn't there be the requirement to merge them afterwards?