Hash Join in Spark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Hash Join in Spark

rose
Hi all,

I want to know more about join operation in spark. I know it uses hash join, but I am not able to figure out the  nature of the implementation such as blocking, non blocking, or shared , not shared partitions.

If anyone knows, please reply to this post along with the linkers of the implementation in the spark source files.

Thanks,
rose
Reply | Threaded
Open this post in threaded view
|

Re: Hash Join in Spark

rose
Since, my earlier question is still unanswered, I have decided to dig into the spark code myself. However, I am new to spark as well as scala in particular. Can some one help me understand the following code snippet:

1. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
2.    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
3.    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
4.   prfs.mapValues { case Seq(vs, ws) =>
      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
5.    }
6. }


Thanks,
rose

On Friday, January 24, 2014 4:32 PM, rose <[hidden email]> wrote:
Hi all,

I want to know more about join operation in spark. I know it uses hash join,
but I am not able to figure out the  nature of the implementation such as
blocking, non blocking, or shared , not shared partitions.

If anyone knows, please reply to this post along with the linkers of the
implementation in the spark source files.

Thanks,
rose



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Hash Join in Spark

Aaron Davidson
This method is doing very little. Line 2 constructs the CoGroupedRDD, which will do all the real work. Note that while this cogroup function just groups 2 RDDs together, CoGroupedRDD allows general n-way cogrouping, so it takes a Seq[RDD(K, _)] rather than just 2 such key-value RDDs.

The rest of the code in this method is simply converting the result of CoGroupedRDD back from its generalized form into an RDD[(K, Seq[V], Seq[W])]. (CoGroupedRDD returns an RDD[(K, Seq[Seq[_]])] as there are n of those Seq[_]s, one per grouping RDD.) To go over some of the finer points of these remaining lines;

3. This line is actually not necessary, and is simply confusing. I have submitted a small patch to remove it.*

4. mapValues will iterate through the results of the CoGroupedRDD (i.e., the already-cogrouped values) in order to change the type of the return value from the generic Seq[Seq[_]] to a (Seq[V], Seq[W]), since we know each Seq has exactly 2 elements. The remainder of this line simply does the casting from Seq[_] to Seq[V] or Seq[W] as appropriate.

* Here's a real explanation for line 3, in case you're curious about the Scala magic that's going on. Normally, all RDDs that look like key-value pairs (having a generic type of Tuple2, like [K, V]) are implicitly converted to PairRDDFunctions, to provide extra functions that can operate over these types of RDDs. For reasons slightly unclear, the author of this code chose to forgo using the implicit conversion in favor of explicitly converting the CoGroupedRDD into a PairRDDFunctions in order to gain access to the mapValues method.


On Sun, Feb 2, 2014 at 8:47 PM, rose kunj <[hidden email]> wrote:
Since, my earlier question is still unanswered, I have decided to dig into the spark code myself. However, I am new to spark as well as scala in particular. Can some one help me understand the following code snippet:

1. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
2.    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
3.    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
4.   prfs.mapValues { case Seq(vs, ws) =>
      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
5.    }
6. }


Thanks,
rose

On Friday, January 24, 2014 4:32 PM, rose <[hidden email]> wrote:
Hi all,

I want to know more about join operation in spark. I know it uses hash join,
but I am not able to figure out the  nature of the implementation such as
blocking, non blocking, or shared , not shared partitions.

If anyone knows, please reply to this post along with the linkers of the
implementation in the spark source files.

Thanks,
rose



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Hash Join in Spark

Aaron Davidson
In reply to this post by rose
Regarding your initial question, I am not certain exactly what you mean by the properties you mention. Here are some details about the hash join that at least use those words, but perhaps not as you intended them.

The hash join is blocking in the sense that we will aggregate all inputs to a partition before returning any results. In CoGroupedRDD, we explicitly iterate over all input key-value pairs to build the map before returning anything. The hash join is non-blocking in the sense that we will stream the results of a particular partition up to any remaining narrow dependencies (i.e, dependencies that don't require a shuffle) before moving on to processing the next partition. Thus, the degree to which we pipeline the hash join is completely dependent on our Partitioner, which is user-customizable. This is simply a property of the Spark staged execution model (good resources: video or slides).

In Sparkland, the join method is generalized such that  the user can specify a Partitioner, such as a RangePartitioner or HashPartitioner. No matter what, the result is effectively a hash join, as each resulting partition results in a bucket, and we join buckets in the same partition using a hash map. To make this very concrete, a HashPartitioner takes a desired number of partitions and simply buckets every input value into a partition via hashing. Dead simple. Once we have 2 RDDs partitioned in the same manner, we take 2 correlated partitions and throw them both into a hash map to aggregate all the values from both that share the same keys.

There is some special logic to ensure that if one RDD is already partitioned correctly, it will not be re-partitioned (or it won't be shuffled if it doesn't have to be). This is the difference between NarrowCoGroupSplitDep and ShuffleCoGroupSplitDep. Each dependency (input RDD) of the cogroup is independent; any subset can be already-partitioned or not, and the least amount of work necessary to get them into order will be done.


On Fri, Jan 24, 2014 at 12:31 AM, rose <[hidden email]> wrote:
Hi all,

I want to know more about join operation in spark. I know it uses hash join,
but I am not able to figure out the  nature of the implementation such as
blocking, non blocking, or shared , not shared partitions.

If anyone knows, please reply to this post along with the linkers of the
implementation in the spark source files.

Thanks,
rose



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.