How to efficiently join this two complicated rdds

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

How to efficiently join this two complicated rdds

hanbo

Sincerely thank you for your work about spark. It simplifies the parallel and iteration process of program. It means a lot for us.


Our program on spark face a small problem and we seek for your help to find an efficient way to solve this problem.

 

Environments:

We run spark in standalone mode. We have two RDDs:

RDD detail:

Type one RDD: generated from sc.textFile (‘file’),each line in ‘file’ is a list of keys, like the following lines:


1 149 255 2238 4480 5951 7276 7368 14670 12661 13060 13450 14674

1 149 255 2238 4480 5951 7276 7368 7678 12672 13078 13450 14674

1 149 257 2239 4485 5952 7276 7368 7678 12683 13096 13450 14674

1 149 259 2241 4487 5954 7276 7368 7678 12683 13096 14673 14674

1 149 260 2242 4488 5955 7276 7368 14670 14671 14672 14673 14674

1 151 258 2240 4486 5953 7276 7368 14670 12684 13096 13450 14674

1 151 258 2240 4486 5953 7276 7368 14670 14671 14672 13450 14674

1 151 259 2241 4487 5954 7276 7368 7678 12683 13096 13450 14674

1 153 250 2237 4472 5950 7276 7368 14670 14671 13078 14673 14674

1 153 258 2240 4486 5953 7276 7368 7678 12683 13096 14673 14674

...

Type two RDD: a set of (key, value).

 

The problem we want to solve:

For each line in RDD one, we need to use the keys of the line to search the value according to the key in RDD of type two. And finally get the sum of these values.

 

Other Details:

The number of the keys in one line of type one RDD is about 50. The size of RDD one file is about 10GB.

The biggest number of key in RDD two is about 4500000, and we will not storage the (key, value) if value is zero.

And maybe the type one RDD has a lot key numbers of 1 but a few of 15877.

 

We want to fine a fast way to solve this problem.

Sincerely thanks


    Bo Han
.
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

Eugen Cepoi
Hi,

What is the size of RDD two?
You want to map à line from RDD one to multiple values from RDD two and get the sum of all of them?
So as result you would have an rdd of size RDD1 and containing a number per line?


2014-02-18 8:06 GMT+01:00 hanbo <[hidden email]>:

Sincerely thank you for your work about spark. It simplifies the parallel
and iteration process of program. It means a lot for us.


Our program on spark face a small problem and we seek for your help to find
an efficient way to solve this problem.



Environments:

We run spark in standalone mode. We have two RDDs:

RDD detail:

Type one RDD: generated from sc.textFile (‘file’),each line in ‘file’ is a
list of keys, like the following lines:


1 149 255 2238 4480 5951 7276 7368 14670 12661 13060 13450 14674

1 149 255 2238 4480 5951 7276 7368 7678 12672 13078 13450 14674

1 149 257 2239 4485 5952 7276 7368 7678 12683 13096 13450 14674

1 149 259 2241 4487 5954 7276 7368 7678 12683 13096 14673 14674

1 149 260 2242 4488 5955 7276 7368 14670 14671 14672 14673 14674

1 151 258 2240 4486 5953 7276 7368 14670 12684 13096 13450 14674

1 151 258 2240 4486 5953 7276 7368 14670 14671 14672 13450 14674

1 151 259 2241 4487 5954 7276 7368 7678 12683 13096 13450 14674

1 153 250 2237 4472 5950 7276 7368 14670 14671 13078 14673 14674

1 153 258 2240 4486 5953 7276 7368 7678 12683 13096 14673 14674

...

Type two RDD: a set of (key, value).



The problem we want to solve:

For each line in RDD one, we need to use the keys of the line to search the
value according to the key in RDD of type two. And finally get the sum of
these values.



Other Details:

The number of the keys in one line of type one RDD is about 50. The size of
RDD one file is about 10GB.

The biggest number of key in RDD two is about 4500000, and we will not
storage the (key, value) if value is zero.

And maybe the type one RDD has a lot key numbers of 1 but a few of 15877.



We want to fine a fast way to solve this problem.

Sincerely thanks


    Bo Han
.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-join-this-two-complicated-rdds-tp1665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

hanbo
Thanks for your reply:

    What is the size of RDD two?
    RDD two is a paried rdd, during iterating, its size may differ from 40000 to 4500000.

    You want to map à line from RDD one to multiple values from RDD two and get the sum of all of them?
    Yes

    So as result you would have an rdd of size RDD1 and containing a number per line?
    Yes

    Thank you again. This problem has puzzled us for several days...
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

hanbo
This post was updated on .
In reply to this post by Eugen Cepoi
we are trying to implement an iterative algorithm.
During each iteration, we need to solve the problem raised in the first post. And the size of RDD two may changes from 4500000 to 40000.
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

Guillaume Pitel
In reply to this post by hanbo
Here's what I would do :

RDD1 :
"1" "2" "3"
"1" "3" "5"

RDD2 :
("1", 11)
("2", 22)
("3", 33)
("5", 55)

1 / flatMap your lines from RDD1 to RDD1bis (key,lineId) (you'll have to use a mapPartitionWithIndex for that in order to produce a lineId)
So you go from this :

"1" "2" "3"
"1" "3" "5"

to :

("1","L1")
("2","L1")
("3","L1")
("1","L2")
("3","L2")
("5","L2")

2 / join RDD1 and RDD2 => RDD1+2

("1",("L1",11))
("2",("L1",22))
("3",("L1",33))
("1",("L2",11))
("3",("L2",33))
("5",("L2",55))

3/ map RDD1+2 to (_2)
4/ groupBy _1 (lineIds) and sum
("L1",[11,22,33])
("L2",[11,33,55])

Guillaume

Sincerely thank you for your work about spark. It simplifies the parallel
and iteration process of program. It means a lot for us.


Our program on spark face a small problem and we seek for your help to find
an efficient way to solve this problem.

 

Environments:

We run spark in standalone mode. We have two RDDs:

RDD detail:

Type one RDD: generated from sc.textFile (‘file’),each line in ‘file’ is a
list of keys, like the following lines:


1 149 255 2238 4480 5951 7276 7368 14670 12661 13060 13450 14674

1 149 255 2238 4480 5951 7276 7368 7678 12672 13078 13450 14674

1 149 257 2239 4485 5952 7276 7368 7678 12683 13096 13450 14674

1 149 259 2241 4487 5954 7276 7368 7678 12683 13096 14673 14674

1 149 260 2242 4488 5955 7276 7368 14670 14671 14672 14673 14674

1 151 258 2240 4486 5953 7276 7368 14670 12684 13096 13450 14674

1 151 258 2240 4486 5953 7276 7368 14670 14671 14672 13450 14674

1 151 259 2241 4487 5954 7276 7368 7678 12683 13096 13450 14674

1 153 250 2237 4472 5950 7276 7368 14670 14671 13078 14673 14674

1 153 258 2240 4486 5953 7276 7368 7678 12683 13096 14673 14674

...

Type two RDD: a set of (key, value).

 

The problem we want to solve:

For each line in RDD one, we need to use the keys of the line to search the
value according to the key in RDD of type two. And finally get the sum of
these values.

 

Other Details:

The number of the keys in one line of type one RDD is about 50. The size of
RDD one file is about 10GB.

The biggest number of key in RDD two is about 4500000, and we will not
storage the (key, value) if value is zero.

And maybe the type one RDD has a lot key numbers of 1 but a few of 15877.

 

We want to fine a fast way to solve this problem.

Sincerely thanks


    Bo Han
.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-join-this-two-complicated-rdds-tp1665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

Eugen Cepoi
If the size of the biggest RDD 2 (4500000) fits in memory on your driver node and each worker you can collect it, then broadcast it to the workers and use it in a map function. Assuming you have at most 4 500 000 entries, each being a pair of ints (4 bytes), it would be less than 1 Gigabyte (around 0.03 if I am not wrong). So probably you can try this solution. If you worry about the network traffic, doing groupBy etc would anyway incur traffic due to shuffled data. The main problem here is that your driver will have work to do.


2014-02-18 11:52 GMT+01:00 Guillaume Pitel <[hidden email]>:
Here's what I would do :

RDD1 :
"1" "2" "3"
"1" "3" "5"

RDD2 :
("1", 11)
("2", 22)
("3", 33)
("5", 55)

1 / flatMap your lines from RDD1 to RDD1bis (key,lineId) (you'll have to use a mapPartitionWithIndex for that in order to produce a lineId)
So you go from this :

"1" "2" "3"
"1" "3" "5"

to :

("1","L1")
("2","L1")
("3","L1")
("1","L2")
("3","L2")
("5","L2")

2 / join RDD1 and RDD2 => RDD1+2

("1",("L1",11))
("2",("L1",22))
("3",("L1",33))
("1",("L2",11))
("3",("L2",33))
("5",("L2",55))

3/ map RDD1+2 to (_2)
4/ groupBy _1 (lineIds) and sum
("L1",[11,22,33])
("L2",[11,33,55])

Guillaume

Sincerely thank you for your work about spark. It simplifies the parallel
and iteration process of program. It means a lot for us.


Our program on spark face a small problem and we seek for your help to find
an efficient way to solve this problem.

 

Environments:

We run spark in standalone mode. We have two RDDs:

RDD detail:

Type one RDD: generated from sc.textFile (‘file’),each line in ‘file’ is a
list of keys, like the following lines:


1 149 255 2238 4480 5951 7276 7368 14670 12661 13060 13450 14674

1 149 255 2238 4480 5951 7276 7368 7678 12672 13078 13450 14674

1 149 257 2239 4485 5952 7276 7368 7678 12683 13096 13450 14674

1 149 259 2241 4487 5954 7276 7368 7678 12683 13096 14673 14674

1 149 260 2242 4488 5955 7276 7368 14670 14671 14672 14673 14674

1 151 258 2240 4486 5953 7276 7368 14670 12684 13096 13450 14674

1 151 258 2240 4486 5953 7276 7368 14670 14671 14672 13450 14674

1 151 259 2241 4487 5954 7276 7368 7678 12683 13096 13450 14674

1 153 250 2237 4472 5950 7276 7368 14670 14671 13078 14673 14674

1 153 258 2240 4486 5953 7276 7368 7678 12683 13096 14673 14674

...

Type two RDD: a set of (key, value).

 

The problem we want to solve:

For each line in RDD one, we need to use the keys of the line to search the
value according to the key in RDD of type two. And finally get the sum of
these values.

 

Other Details:

The number of the keys in one line of type one RDD is about 50. The size of
RDD one file is about 10GB.

The biggest number of key in RDD two is about 4500000, and we will not
storage the (key, value) if value is zero.

And maybe the type one RDD has a lot key numbers of 1 but a few of 15877.

 

We want to fine a fast way to solve this problem.

Sincerely thanks


    Bo Han
.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-join-this-two-complicated-rdds-tp1665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


--
eXenSa
Guillaume PITEL, Président
<a href="tel:%2B33%280%296%2025%2048%2086%2080" value="+33625488680" target="_blank">+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel <a href="tel:%2B33%280%291%2084%2016%2036%2077" value="+33184163677" target="_blank">+33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

zhaoxw12
This post was updated on .
Thank you very much for your answer. We have tried the method above before. This is the problem during doing so.

1. We want to avoid collect method because we do this step in the iteration and the RDD2 changes in every iteration. So the speed ,usage of memory and network traffic bother us a lot.

2. The keys in RDD1 are not well-distributed. For example, key "1" is in every line but the total number of key "1765" in RDD1 may be less than 10. It will cause some workers have more data to process and cost more time.

We have done some experiment using data which has much small size but same form. The method above will cost more than 10 mins while using collectAsMap function to collect RDD2 and sending it to each worker will cost 2 mins. But the second method will get outOfMemery error while we try the big data.
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

hanbo
In reply to this post by Guillaume Pitel
Thank you for your reply.
   we have tried this method before, but step 2 is very time cosuming due to the value number of different keys is not well-distributed. Some key in lines of RDD1 is very dense, but others are very sparse. After join, the splits containing dense keys is very large and time consuming. We don't know how to solve this then. Do you have more efficient way?


   2 / join RDD1 and RDD2 => RDD1+2
    ("1",("L1",11))
    ("2",("L1",22))
    ("3",("L1",33))
    ("1",("L2",11))
    ("3",("L2",33))
    ("5",("L2",55))

   
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

Guillaume Pitel
Actually, even without the skewness problem, the solution I've proposed is really not efficient, since it generates a lot of data. Since what you have is very close to a sparse matrix * sparse vector computation, in my opinion, you should split your data in blocks and do the join on blocks.

Have a look at how the implementation of ALS does it, I think it should be efficient for your problem.

Does RDD1 change ? If not, then you can prepare your blocks once, and iterate just like in ALS. Also, since you don't have to perform a computation on each row, you can split the sparse matrix both rowwise and columnwise.

Guillaume



Thank you for your reply.
   we have tried this method before, but step 2 is very time cosuming due to
the value number of different keys is not well-distributed. Some key in
lines of RDD1 is very dense, but others are very sparse. After join, the
splits containing dense keys is very large and time consuming. We don't know
how to solve this then. Do you have more efficient way?


   2 / join RDD1 and RDD2 => RDD1+2
    ("1",("L1",11))
    ("2",("L1",22))
    ("3",("L1",33))
    ("1",("L2",11))
    ("3",("L2",33))
    ("5",("L2",55))



--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

hanbo
In reply to this post by Eugen Cepoi
we have implement this way, we use pyspark, and standalone mode. We collect the new RDD2 in each iteration. The java heap memory costed by the driver program increases Gradually. And finally Collapse with OutOfMemory Error.  

We have done some tests, in each iteration, we simply collect a vector. This Little Simple problem also costed more and more java heap memory, and finally raised OutOfMemory.

We don't know how the momery increased. Is it costed by the DAG information? Or by some variable related with the collect function?

Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

hanbo
In reply to this post by Guillaume Pitel
RDD1 doesn't change.
Thank you for your advice, we will have a look at how ALS iterate
Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

Eugen Cepoi
In reply to this post by hanbo
Yeah this is due to the fact that the broadcasted variables are kept in memory and I am guessing that it is referenced in a way that prevents it from being garbage collected...
A solution could be to enable spark.cleaner.ttl, but I don't like it much as it sounds more like a hacky solution.
There is also a PR that has been merged few days ago https://github.com/apache/incubator-spark/pull/543, unfortunately it is not part of spark 0.9 :(

However you can have a look at the code source and maybe implement it in your job so that at the end of each iteration you are removing the broadcasted vars, it should be possible through SparkEnv.get.blockManager.


2014-02-19 12:09 GMT+01:00 hanbo <[hidden email]>:
we have implement this way, we use pyspark, and standalone mode. We collect
the new RDD2 in each iteration. The java heap memory costed by the driver
program increases Gradually. And finally Collapse with OutOfMemory Error.

We have done some tests, in each iteration, we simply collect a vector. This
Little Simple problem also costed more and more java heap memory, and
finally raised OutOfMemory.

We don't know how the momery increased. Is it costed by the DAG information?
Or by some variable related with the collect function?





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-join-this-two-complicated-rdds-tp1665p1749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: How to efficiently join this two complicated rdds

Eugen Cepoi
BTW try to only load the largest RDD2 version (I guess corresponding to the last iterations) and cache the RDD, do a count on it and have a look into the UI to see what's the size of the RDD. If you have enough ram on each worker + driver then this should fit with the broadcast solution if you do the cleaning of old broadcasted vars.


2014-02-19 12:25 GMT+01:00 Eugen Cepoi <[hidden email]>:
Yeah this is due to the fact that the broadcasted variables are kept in memory and I am guessing that it is referenced in a way that prevents it from being garbage collected...
A solution could be to enable spark.cleaner.ttl, but I don't like it much as it sounds more like a hacky solution.
There is also a PR that has been merged few days ago https://github.com/apache/incubator-spark/pull/543, unfortunately it is not part of spark 0.9 :(

However you can have a look at the code source and maybe implement it in your job so that at the end of each iteration you are removing the broadcasted vars, it should be possible through SparkEnv.get.blockManager.


2014-02-19 12:09 GMT+01:00 hanbo <[hidden email]>:

we have implement this way, we use pyspark, and standalone mode. We collect
the new RDD2 in each iteration. The java heap memory costed by the driver
program increases Gradually. And finally Collapse with OutOfMemory Error.

We have done some tests, in each iteration, we simply collect a vector. This
Little Simple problem also costed more and more java heap memory, and
finally raised OutOfMemory.

We don't know how the momery increased. Is it costed by the DAG information?
Or by some variable related with the collect function?





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-join-this-two-complicated-rdds-tp1665p1749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.