spark is running extremely slow with larger data set, like 2G

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

spark is running extremely slow with larger data set, like 2G

xuhongnever
my spark version is 1.1.0 pre-build with hadoop 1.x
my code is implemented in python trying to covert a graph data set in edge list to adjacency list

spark is running in standalone mode

It runs well with a small data set like soc-liveJournal1, about 1G

Then I run it on 25G twitter graph, one task won't finish in hours

I tried the input both from NFS and HDFS


What's could be the problem?
Reply | Threaded
Open this post in threaded view
|

Re: spark is running extremely slow with larger data set, like 2G

xuhongnever
my code is here:

from pyspark import SparkConf, SparkContext

def Undirect(edge):
    vector = edge.strip().split('\t')
    if(vector[0].isdigit()):
        return [(vector[0], vector[1])]
    return []

   
conf = SparkConf()
conf.setMaster("spark://compute-0-14:7077")
conf.setAppName("adjacencylist")
conf.set("spark.executor.memory", "1g")

sc = SparkContext(conf = conf)

file = sc.textFile("file:///home/xzhang/data/soc-LiveJournal1.txt")
records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b: a + "\t" + b )
#print(records.count())
#records = records.sortByKey()
records = records.map(lambda line: line[0] + "\t" + line[1])
records.saveAsTextFile("file:///home/xzhang/data/result")
Reply | Threaded
Open this post in threaded view
|

Re: spark is running extremely slow with larger data set, like 2G

Akhil
Try providing the level of parallelism parameter to your reduceByKey operation.

Thanks
Best Regards

On Fri, Oct 24, 2014 at 3:44 AM, xuhongnever <[hidden email]> wrote:
my code is here:

from pyspark import SparkConf, SparkContext

def Undirect(edge):
    vector = edge.strip().split('\t')
    if(vector[0].isdigit()):
        return [(vector[0], vector[1])]
    return []


conf = SparkConf()
conf.setMaster("spark://compute-0-14:7077")
conf.setAppName("adjacencylist")
conf.set("spark.executor.memory", "1g")

sc = SparkContext(conf = conf)

file = sc.textFile("file:///home/xzhang/data/soc-LiveJournal1.txt")
records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b:
a + "\t" + b )
#print(records.count())
#records = records.sortByKey()
records = records.map(lambda line: line[0] + "\t" + line[1])
records.saveAsTextFile("file:///home/xzhang/data/result")



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: spark is running extremely slow with larger data set, like 2G

Davies Liu-2
In reply to this post by xuhongnever
On Thu, Oct 23, 2014 at 3:14 PM, xuhongnever <[hidden email]> wrote:

> my code is here:
>
> from pyspark import SparkConf, SparkContext
>
> def Undirect(edge):
>     vector = edge.strip().split('\t')
>     if(vector[0].isdigit()):
>         return [(vector[0], vector[1])]
>     return []
>
>
> conf = SparkConf()
> conf.setMaster("spark://compute-0-14:7077")
> conf.setAppName("adjacencylist")
> conf.set("spark.executor.memory", "1g")

Use more memory to gain better performance, or spark will keep
spilling the data into disks, that is much slower.
You also could give more memory to Python worker by set
spark.python.worker.memory=1g or 2g

> sc = SparkContext(conf = conf)
>
> file = sc.textFile("file:///home/xzhang/data/soc-LiveJournal1.txt")
> records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b:
> a + "\t" + b )

a + "\t" + b will be very slow, if the number of values is large,
groupByKey() will be better than it.

> #print(records.count())
> #records = records.sortByKey()
> records = records.map(lambda line: line[0] + "\t" + line[1])
> records.saveAsTextFile("file:///home/xzhang/data/result")
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17153.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: spark is running extremely slow with larger data set, like 2G

xuhongnever
Thank you very much.
Changing to groupByKey works, it runs much more faster.

By the way, could you give me some explanation of the following configurations, after reading the official explanation, i'm still confused, what's the relationship between them? is there any memory overlap between them?

spark.python.worker.memory
spark.executor.memory
spark.driver.memory
Reply | Threaded
Open this post in threaded view
|

Re: spark is running extremely slow with larger data set, like 2G

Davies Liu-2
On Fri, Oct 24, 2014 at 1:37 PM, xuhongnever <[hidden email]> wrote:

> Thank you very much.
> Changing to groupByKey works, it runs much more faster.
>
> By the way, could you give me some explanation of the following
> configurations, after reading the official explanation, i'm still confused,
> what's the relationship between them? is there any memory overlap between
> them?
>
> *spark.python.worker.memory
> spark.executor.memory
> spark.driver.memory*

spark.driver.memory is used for JVM together with you local python
scripts (called driver),
spark.executor.memory is used for JVM in spark cluster (called slave
or executor),

In local mode, driver and executor share the same JVM, so
spark.driver.memory is used.

spark.python.worker.memory is used for Python worker in executor.
Because of GIL,
pyspark use multiple python process in the executor, one for each task.
spark.python.worker.memory will tell the python worker to when to
spill the data into disk.
It's not hard limit, so the memory used in python worker maybe is
little higher than it.
If you have enough memory in executor, increase spark.python.worker.memory will
let python worker to use more memory during shuffle (like groupBy()),
which will increase
the performance.

> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17231.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]