Identify bottleneck

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Identify bottleneck

Antoine DUBOIS
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine

smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Enrico Minack
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine


Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Antoine DUBOIS
There's 15 withColumn Statement and one drop at the end to remove old column.
I which I could write it as a single sql statement, but it's not reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



De: "Enrico Minack" <[hidden email]>
À: [hidden email], "Antoine DUBOIS" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 11:13:38
Objet: Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine




smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Chris Teoh
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition.

If that is the case, It may be worthwhile considering calling the repartition method to distribute your data across multiple partitions so you get more parallelism.

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, <[hidden email]> wrote:
There's 15 withColumn Statement and one drop at the end to remove old column.
I which I could write it as a single sql statement, but it's not reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



De: "Enrico Minack" <[hidden email]>
À: [hidden email], "Antoine DUBOIS" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 11:13:38
Objet: Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine



Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Enrico Minack
Good points, but single-line CSV files are splitable (not multi-line CSV though), especially in the mentioned size. And bz2 is also splitable, though reading speed is much slower than uncompressed csv.

If your csv.bz2 files are not splittable then repartitioning does not improve the situation much because reading happens through one worker first before repartitioning happens.

Besides checking the Spark UI SQL tab you can check that your stage has multiple tasks, ideally 200, at least 32 to fully employ your cluster.


Am 18.12.19 um 13:33 schrieb Chris Teoh:
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition.

If that is the case, It may be worthwhile considering calling the repartition method to distribute your data across multiple partitions so you get more parallelism.

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, <[hidden email]> wrote:
There's 15 withColumn Statement and one drop at the end to remove old column.
I which I could write it as a single sql statement, but it's not reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



De: "Enrico Minack" <[hidden email]>
À: [hidden email], "Antoine DUBOIS" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 11:13:38
Objet: Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine




Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Antoine DUBOIS

I can confirm that the job is able to use multiple cores on multiple nodes at the same time and that I have several task running at the same time.
Depending on my csv it take from 5 part up to several hundred part.
Regarding the job running locally on one node I took more than 20 minutes, ans didn't had time to let it finish.




De: "Enrico Minack" <[hidden email]>
À: "Chris Teoh" <[hidden email]>, "Antoine DUBOIS" <[hidden email]>
Cc: "user @spark" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 14:29:07
Objet: Re: Identify bottleneck

Good points, but single-line CSV files are splitable (not multi-line CSV though), especially in the mentioned size. And bz2 is also splitable, though reading speed is much slower than uncompressed csv.

If your csv.bz2 files are not splittable then repartitioning does not improve the situation much because reading happens through one worker first before repartitioning happens.

Besides checking the Spark UI SQL tab you can check that your stage has multiple tasks, ideally 200, at least 32 to fully employ your cluster.


Am 18.12.19 um 13:33 schrieb Chris Teoh:
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition.

If that is the case, It may be worthwhile considering calling the repartition method to distribute your data across multiple partitions so you get more parallelism.

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, <[hidden email]> wrote:
There's 15 withColumn Statement and one drop at the end to remove old column.
I which I could write it as a single sql statement, but it's not reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



De: "Enrico Minack" <[hidden email]>
À: [hidden email], "Antoine DUBOIS" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 11:13:38
Objet: Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine






smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Antoine DUBOIS
Also,
the framework allow to execute all the modification at the same time as one big request (but i wont paste it here, it would not be really relevant



De: "Antoine DUBOIS" <[hidden email]>
À: "Enrico Minack" <[hidden email]>
Cc: "Chris Teoh" <[hidden email]>, "user @spark" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 14:59:12
Objet: Re: Identify bottleneck


I can confirm that the job is able to use multiple cores on multiple nodes at the same time and that I have several task running at the same time.
Depending on my csv it take from 5 part up to several hundred part.
Regarding the job running locally on one node I took more than 20 minutes, ans didn't had time to let it finish.




De: "Enrico Minack" <[hidden email]>
À: "Chris Teoh" <[hidden email]>, "Antoine DUBOIS" <[hidden email]>
Cc: "user @spark" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 14:29:07
Objet: Re: Identify bottleneck

Good points, but single-line CSV files are splitable (not multi-line CSV though), especially in the mentioned size. And bz2 is also splitable, though reading speed is much slower than uncompressed csv.

If your csv.bz2 files are not splittable then repartitioning does not improve the situation much because reading happens through one worker first before repartitioning happens.

Besides checking the Spark UI SQL tab you can check that your stage has multiple tasks, ideally 200, at least 32 to fully employ your cluster.


Am 18.12.19 um 13:33 schrieb Chris Teoh:
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition.

If that is the case, It may be worthwhile considering calling the repartition method to distribute your data across multiple partitions so you get more parallelism.

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, <[hidden email]> wrote:
There's 15 withColumn Statement and one drop at the end to remove old column.
I which I could write it as a single sql statement, but it's not reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



De: "Enrico Minack" <[hidden email]>
À: [hidden email], "Antoine DUBOIS" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 11:13:38
Objet: Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine






smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Chris Teoh
If you're inferring the schema, that also incurs an overhead whilst the data is being read into dataframe.

Are you observing data skew? Perhaps some nodes are busier than others. Look at the average task time compared to the lowest and highest times.

At 20 cores, 2 cores each executor, 10 executors, how many tasks/partitions in the dataframe? Your executors should be able to process 2 tasks in parallel each.

Additionally, you should aim to have number of input partitions % total cores =0 so you don't have idle executors.

On Thu, 19 Dec 2019, 1:11 am Antoine DUBOIS, <[hidden email]> wrote:
Also,
the framework allow to execute all the modification at the same time as one big request (but i wont paste it here, it would not be really relevant



De: "Antoine DUBOIS" <[hidden email]>
À: "Enrico Minack" <[hidden email]>
Cc: "Chris Teoh" <[hidden email]>, "user @spark" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 14:59:12
Objet: Re: Identify bottleneck


I can confirm that the job is able to use multiple cores on multiple nodes at the same time and that I have several task running at the same time.
Depending on my csv it take from 5 part up to several hundred part.
Regarding the job running locally on one node I took more than 20 minutes, ans didn't had time to let it finish.




De: "Enrico Minack" <[hidden email]>
À: "Chris Teoh" <[hidden email]>, "Antoine DUBOIS" <[hidden email]>
Cc: "user @spark" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 14:29:07
Objet: Re: Identify bottleneck

Good points, but single-line CSV files are splitable (not multi-line CSV though), especially in the mentioned size. And bz2 is also splitable, though reading speed is much slower than uncompressed csv.

If your csv.bz2 files are not splittable then repartitioning does not improve the situation much because reading happens through one worker first before repartitioning happens.

Besides checking the Spark UI SQL tab you can check that your stage has multiple tasks, ideally 200, at least 32 to fully employ your cluster.


Am 18.12.19 um 13:33 schrieb Chris Teoh:
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition.

If that is the case, It may be worthwhile considering calling the repartition method to distribute your data across multiple partitions so you get more parallelism.

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, <[hidden email]> wrote:
There's 15 withColumn Statement and one drop at the end to remove old column.
I which I could write it as a single sql statement, but it's not reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



De: "Enrico Minack" <[hidden email]>
À: [hidden email], "Antoine DUBOIS" <[hidden email]>
Envoyé: Mercredi 18 Décembre 2019 11:13:38
Objet: Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine





Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

ayan guha
In reply to this post by Enrico Minack
Quick question: Why is it better to use one sql vs multiple withColumn? isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <[hidden email]> wrote:
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Chris Teoh
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical.

On Fri, 20 Dec 2019, 8:56 am ayan guha, <[hidden email]> wrote:
Quick question: Why is it better to use one sql vs multiple withColumn? isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <[hidden email]> wrote:
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Enrico Minack

Am 19.12.19 um 23:33 schrieb Chris Teoh:
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical.

On Fri, 20 Dec 2019, 8:56 am ayan guha, <[hidden email]> wrote:
Quick question: Why is it better to use one sql vs multiple withColumn? isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <[hidden email]> wrote:
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine


--
Best Regards,
Ayan Guha


Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

ayan guha
Cool, thanks! Very helpful

On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack <[hidden email]> wrote:

Am 19.12.19 um 23:33 schrieb Chris Teoh:
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical.

On Fri, 20 Dec 2019, 8:56 am ayan guha, <[hidden email]> wrote:
Quick question: Why is it better to use one sql vs multiple withColumn? isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <[hidden email]> wrote:
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine


--
Best Regards,
Ayan Guha


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Solved: Identify bottleneck

Antoine DUBOIS
Thank you very much for your help and your inputs.
I understood some stuff but I finally understood my issue.
In this case my main issue was a virtualization problem my vm was running on a small hypervysor, I split them on multiple hypervisor and the application now scale properly with the number of core and processing uncompressed data is indeed faster.
My bottleneck seems to be the compression.

Thank you all and have a merry chrismas



De: "ayan guha" <[hidden email]>
À: "Enrico Minack" <[hidden email]>
Cc: "Antoine DUBOIS" <[hidden email]>, "Chris Teoh" <[hidden email]>, [hidden email]
Envoyé: Vendredi 20 Décembre 2019 09:39:49
Objet: Re: Identify bottleneck

Cool, thanks! Very helpful

On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack <[hidden email]> wrote:

Am 19.12.19 um 23:33 schrieb Chris Teoh:
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical.

On Fri, 20 Dec 2019, 8:56 am ayan guha, <[hidden email]> wrote:
Quick question: Why is it better to use one sql vs multiple withColumn? isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <[hidden email]> wrote:
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
Hello

I'm working on an ETL based on csv describing file systems to transform it into parquet so I can work on them easily to extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation and the framework helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the CSV is and I wanted to identify if bz2 or network could be an issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:
  • Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes.
  • Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.
  • From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes.
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those results have been reproduced several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows mainly CPU time (no shuffling, no randomization overload either).
I also noticed that memory storage is never used during the execution. I know from several hours of research that bz2 is the only real compression algorithm usable as an input in spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine


--
Best Regards,
Ayan Guha


--
Best Regards,
Ayan Guha


smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Identify bottleneck

Nicolas Paris-2
In reply to this post by Enrico Minack
apparently the "withColumn" issue only apply for hundred or thousand of
calls. This was not the case here (twenty calls)

On Fri, Dec 20, 2019 at 08:53:16AM +0100, Enrico Minack wrote:

> The issue is explained in depth here: https://medium.com/@manuzhang/
> the-hidden-cost-of-spark-withcolumn-8ffea517c015
>
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
>
>     As far as I'm aware it isn't any better. The logic all gets processed by
>     the same engine so to confirm, compare the DAGs generated from both
>     approaches and see if they're identical.
>
>     On Fri, 20 Dec 2019, 8:56 am ayan guha, <[hidden email]> wrote:
>
>         Quick question: Why is it better to use one sql vs multiple withColumn?
>         isnt everything eventually rewritten by catalyst?
>
>         On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <[hidden email]>
>         wrote:
>
>             How many withColumn statements do you have? Note that it is better
>             to use a single select, rather than lots of withColumn. This also
>             makes drops redundant.
>
>             Reading 25m CSV lines and writing to Parquet in 5 minutes on 32
>             cores is really slow. Can you try this on a single machine, i.e.
>             run wit "local[*]".
>
>             Can you rule out the writing part by counting the rows? I presume
>             this all happens in a single stage.
>
>             Enrico
>
>
>             Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>
>                 Hello
>
>                 I'm working on an ETL based on csv describing file systems to
>                 transform it into parquet so I can work on them easily to
>                 extract informations.
>                 I'm using Mr. Powers framework Daria to do so. I've quiet
>                 different input and a lot of transformation and the framework
>                 helps organize the code.
>                 I have a stand-alone cluster v2.3.2 composed of 4 node with 8
>                 cores and 32GB of memory each.
>                 The storage is handle by a CephFS volume mounted on all nodes.
>                 First a small description of my algorithm (it's quiet simple):
>
>
>                     Use SparkContext to load the csv.bz2 file,
>                     Chain a lot of withColumn() statement,
>                     Drop all unnecessary columns,
>                     Write parquet file to CephFS
>
>
>                 This treatment can take several hours depending on how much
>                 lines the CSV is and I wanted to identify if bz2 or network
>                 could be an issue
>                 so I run the following test (several time with consistent
>                 result) :
>                 I tried the following scenario with 20 cores and 2 core per
>                 task:
>                   ■ Read the csv.bz2 from CephFS with connection with 1Gb/s for
>                     each node: ~5 minutes.
>                   ■ Read the csv.bz2 from TMPFS(setup to look like a shared
>                     storage space): ~5 minutes.
>                   ■ From the 2 previous tests I concluded that uncompressing
>                     the file was part of the bottleneck so I decided to
>                     uncompress the file and store it in TMPFS as well, result:
>                     ~5.9 minutes.
>                 The test file has 25'833'369 lines and is 370MB compressed and
>                 3700MB uncompressed. Those results have been reproduced several
>                 time each.
>                 My question here is by what am I bottleneck in this case ?
>
>                 I though that the uncompressed file in RAM would be the
>                 fastest. Is it possible that my program is suboptimal reading
>                 the CSV ?
>                 In the execution logs on the cluster I have 5 to 10 seconds GC
>                 time max, and timeline shows mainly CPU time (no shuffling, no
>                 randomization overload either).
>                 I also noticed that memory storage is never used during the
>                 execution. I know from several hours of research that bz2 is
>                 the only real compression algorithm usable as an input in spark
>                 for parallelization reasons.
>
>                 Do you have any idea of why such a behaviour ?
>                 and do you have any idea on how to improve such treatment ?
>
>                 Cheers
>
>                 Antoine
>
>
>
>         --
>         Best Regards,
>         Ayan Guha
>
>

--
nicolas

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]