Repartition not working on a csv file

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Repartition not working on a csv file

AbdealiJK
I am using Spark 2.3.0 and trying to read a CSV file which has 500 records.
When I try to read it, spark says that it has two stages: 10, 11 and then they join into stage 12.

This makes sense and is what I would expect, as I have 30 map-based UDFs after which i do a join, and run another 10 UDFs and then save the file as parquet.

The stages 10 and 11 have only 2 tasks according to spark. I have a max-executors possible of 20 on my cluster. I would like Spark to use all 20 executors for this task.

1csv+Repartition: Right after reading the file, if I do a repartition, it still takes 2 tasks
1csv+Repartition+count(): Right after reading the file, if I do a repartition and then do an action word like count(), it still takes 2 tasks
50csv: If I split my 500line csv into 50 files with 10 lines each, it takes 18 tasks
50csv+Repartition: If I split my 500line csv into 50 files with 10 lines each, and do a repartition and a count, it takes 19 tasks
500csv+Repartition: If I split my 500line csv into 500 files with 1 line each, and do a repartition and a count, it takes 19 tasks

All repartitions above are: .repartition(200)

I can't understand what it's trying to do.
I was expecting that if I do a .repartition(200) it would just create 200 tasks after shuffling the data. But it's not doing that.
I can recollect this worked find on Spark 1.6.x.

PS: The reason I want more tasks is because those UDFs are very heavy and slow - I'd like to use more executors to reduce computation time. I'm sure they are parallelizable ... 

Reply | Threaded
Open this post in threaded view
|

Re: Repartition not working on a csv file

yujhe.li
Abdeali Kothari wrote
> I am using Spark 2.3.0 and trying to read a CSV file which has 500
> records.
> When I try to read it, spark says that it has two stages: 10, 11 and then
> they join into stage 12.

What's your CSV size per file? I think Spark optimizer may put many files
into one task when reading small files.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Repartition not working on a csv file

AbdealiJK
My entire CSV is less than 20KB. 
By somewhere in between, I do a broadcast join with 3500 records in another file. 
After the broadcast join I have a lot of processing to do. Overall, the time to process a single record goes up-to 5mins on 1 executor 

I'm trying to increase the partitions that my data is in so that I have at maximum 1 record per executor (currently it sets 2 tasks, and hence 2 executors... I want it to split it into at least 100 tasks at a time so I get 5 records per task => ~20min per task) 


On Sun, Jul 1, 2018, 07:58 yujhe.li <[hidden email]> wrote:
Abdeali Kothari wrote
> I am using Spark 2.3.0 and trying to read a CSV file which has 500
> records.
> When I try to read it, spark says that it has two stages: 10, 11 and then
> they join into stage 12.

What's your CSV size per file? I think Spark optimizer may put many files
into one task when reading small files.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Repartition not working on a csv file

yujhe.li
Abdeali Kothari wrote

> My entire CSV is less than 20KB.
> By somewhere in between, I do a broadcast join with 3500 records in
> another
> file.
> After the broadcast join I have a lot of processing to do. Overall, the
> time to process a single record goes up-to 5mins on 1 executor
>
> I'm trying to increase the partitions that my data is in so that I have at
> maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> executors... I want it to split it into at least 100 tasks at a time so I
> get 5 records per task => ~20min per task)

Maybe you can try repartition(100) after broadcast join, the task number
should change to 100 for your later transformation.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Repartition not working on a csv file

AbdealiJK
I've tried that too - it doesn't work. It does a repetition, but not right after the broadcast join - it does a lot more processing and does the repetition right before I do my next sortmerge join (stage 12 I described above) 
As the heavy processing is before the sort merge join, it still doesn't help

On Sun, Jul 1, 2018, 08:30 yujhe.li <[hidden email]> wrote:
Abdeali Kothari wrote
> My entire CSV is less than 20KB.
> By somewhere in between, I do a broadcast join with 3500 records in
> another
> file.
> After the broadcast join I have a lot of processing to do. Overall, the
> time to process a single record goes up-to 5mins on 1 executor
>
> I'm trying to increase the partitions that my data is in so that I have at
> maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> executors... I want it to split it into at least 100 tasks at a time so I
> get 5 records per task => ~20min per task)

Maybe you can try repartition(100) after broadcast join, the task number
should change to 100 for your later transformation.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Repartition not working on a csv file

Alexander Czech-2
You could try to force a repartion right at that point by producing a cached version of the DF with .cache() if memory allows it.

On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari <[hidden email]> wrote:
I've tried that too - it doesn't work. It does a repetition, but not right after the broadcast join - it does a lot more processing and does the repetition right before I do my next sortmerge join (stage 12 I described above) 
As the heavy processing is before the sort merge join, it still doesn't help

On Sun, Jul 1, 2018, 08:30 yujhe.li <[hidden email]> wrote:
Abdeali Kothari wrote
> My entire CSV is less than 20KB.
> By somewhere in between, I do a broadcast join with 3500 records in
> another
> file.
> After the broadcast join I have a lot of processing to do. Overall, the
> time to process a single record goes up-to 5mins on 1 executor
>
> I'm trying to increase the partitions that my data is in so that I have at
> maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> executors... I want it to split it into at least 100 tasks at a time so I
> get 5 records per task => ~20min per task)

Maybe you can try repartition(100) after broadcast join, the task number
should change to 100 for your later transformation.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Repartition not working on a csv file

AbdealiJK
I prefer not to do a .cache() due to memory limits. But I did try a persist() with DISK_ONLY

I did the repartition(), followed by a .count() followed by a persist() of DISK_ONLY
That didn't change the number of tasks either 



On Sun, Jul 1, 2018, 15:50 Alexander Czech <[hidden email]> wrote:
You could try to force a repartion right at that point by producing a cached version of the DF with .cache() if memory allows it.

On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari <[hidden email]> wrote:
I've tried that too - it doesn't work. It does a repetition, but not right after the broadcast join - it does a lot more processing and does the repetition right before I do my next sortmerge join (stage 12 I described above) 
As the heavy processing is before the sort merge join, it still doesn't help

On Sun, Jul 1, 2018, 08:30 yujhe.li <[hidden email]> wrote:
Abdeali Kothari wrote
> My entire CSV is less than 20KB.
> By somewhere in between, I do a broadcast join with 3500 records in
> another
> file.
> After the broadcast join I have a lot of processing to do. Overall, the
> time to process a single record goes up-to 5mins on 1 executor
>
> I'm trying to increase the partitions that my data is in so that I have at
> maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> executors... I want it to split it into at least 100 tasks at a time so I
> get 5 records per task => ~20min per task)

Maybe you can try repartition(100) after broadcast join, the task number
should change to 100 for your later transformation.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]