Spark limitations question

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark limitations question

groober
This post has NOT been accepted by the mailing list yet.
Hello friends,

I'm currently evaluating Spark as a data pipeline solution.  The pyspark version sounds especially exciting and I've been tinkering with it. But some of my data sets are 100Billion+ rows and I need to make sure Spark can serve my needs.  

I'm currently testing a join of two data sets, Base and Skewed.  They're both 100 million rows and they look like the following.

Base:
{"id": "0", "num": 0}
{"id": "1", "num": 1}
{"id": "2", "num": 2}
{"id": "3", "num": 3}
...

Skewed:
{"id": "0", "num": 0}
{"id": "1", "num": 1}
{"id": "0", "num": 0}
{"id": "3", "num": 3}
{"id": "0", "num": 0}
{"id": "5", "num": 5}
{"id": "0", "num": 0}
...

I have two tests:
1. join Base to itself, sum the "nums" and write out to HDFS
2. same as 1 except join Base to Skewed
(I realize the outputs are contrived and meaningless, but again, I'm testing limits here)

Test 1 works amazingly fast.

Test 2, however, works well on all but one of the nodes on the cluster.  That node runs out of memory quickly and dies.  All of those nodes have 10 gigs of memory available to the spark executor and remaining ~60 gigs memory available to python. AKAIK more than enough to hold the entire datasets many times over.

See code below.

I'm assuming there's a build up of the skewed 50million rows on the one particular node and is running out of memory while it tries to merge them.

So is this normal? A known problem? If it is, what can I do to remedy the issue? Any further experiments I can run?

Thanks for any time you can spare


-Roman

---------------------------

import json
 
base = sc.textFile("100million/part-*")
         .map(json.loads)
         .keyBy(lambda x: x["id"])

skewed = sc.textFile("100million_skewed/part-*")
           .map(json.loads)
           .keyBy(lambda x: x["id"])
 
base.join(skewed)
    .map(merge_the_dicts)  # just assume this function exists/works
    .map(json.dumps)
    .saveAsTextFile("output")