Writing a DataFrame is taking too long and huge space

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing a DataFrame is taking too long and huge space

Md. Rezaul Karim
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: +49 241 80-21527

Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Matteo Durighetto
Hello, try to use parquet format with compression ( like snappy or lz4 ) so the produced files will be smaller and it will generate less i/o. Moreover normally parquet is more faster than csv format in reading for further operations .
Another possible format is ORC file.

Kind Regards

Matteo


2018-03-09 11:23 GMT+01:00 Md. Rezaul Karim <[hidden email]>:
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" target="_blank">+49 241 80-21527


Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Gourav Dutta
Which version of spark are you using?

The reason for asking this question is from Spark 2.x csv is internal library so no need to save it with com.databricks.spark.csv package.

Moreover, taking time for this simple task is very much dependent upon your cluster health. Could you please provide the details of the following things (if you are using YARN)?

1. Number of nodes
2. VCores
3. Node Memory

an

Thanks,
Gourav Dutta

On Mar 9, 2018 5:35 PM, "Matteo Durighetto" <[hidden email]> wrote:
Hello, try to use parquet format with compression ( like snappy or lz4 ) so the produced files will be smaller and it will generate less i/o. Moreover normally parquet is more faster than csv format in reading for further operations .
Another possible format is ORC file.

Kind Regards

Matteo


2018-03-09 11:23 GMT+01:00 Md. Rezaul Karim <[hidden email]>:
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" target="_blank">+49 241 80-21527



Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

theikkila
In reply to this post by Md. Rezaul Karim
Sounds like you’re doing something else than just writing the same file back to disk, what your preprocessing consists?

Sometimes you can save lot’s of space by using other formats but now we’re talking over 200x increase in file size so depending on the transformations for the data you might not get so huge savings by using some other format.

If you can give more details about what you are doing with the data we could probably help with your task.

Slowness probably happens because Spark is using disk to process the data into single partition for writing the single file, one thing to reconsider is that if you can merge the product files after the process or even pre-partition it for it’s final use case.

- Teemu

On 9.3.2018, at 12.23, Md. Rezaul Karim <[hidden email]> wrote:

Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany
Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
eMail: [hidden email]
Tel: +49 241 80-21527

Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Md. Rezaul Karim
Hi All,

Thanks for prompt response. Really appreciated! Here's a few more info:

1. Spark version: 2.3.0
2. vCore: 8
3. RAM: 32GB
4. Deploy mode: Spark standalone

Operation performed: I did transformations using StringIndexer on some columns and null imputations. That's all.

Why writing back into CSV: I need to write the dataframe into CSV to be used by a non-Spark application. Nevertheless, I need to perform pre-processing on a larger-dataset (about 2GB) and this one is just a simple. So writing into parquet or ORC is not a viable option for me.

I was trying to use Spark for only pre-processing. By the way, I tried using Spark builtin CSV library too.




Best,


----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: +49 241 80-21527


On 9 March 2018 at 13:41, Teemu Heikkilä <[hidden email]> wrote:
Sounds like you’re doing something else than just writing the same file back to disk, what your preprocessing consists?

Sometimes you can save lot’s of space by using other formats but now we’re talking over 200x increase in file size so depending on the transformations for the data you might not get so huge savings by using some other format.

If you can give more details about what you are doing with the data we could probably help with your task.

Slowness probably happens because Spark is using disk to process the data into single partition for writing the single file, one thing to reconsider is that if you can merge the product files after the process or even pre-partition it for it’s final use case.

- Teemu

On 9.3.2018, at 12.23, Md. Rezaul Karim <[hidden email]> wrote:

Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany
Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" target="_blank">+49 241 80-21527


Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Vadim Semenov-2
In reply to this post by Md. Rezaul Karim
because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <[hidden email]> wrote:
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" target="_blank">+49 241 80-21527




--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Deepak Sharma
I would suggest repartioning it to reasonable partitions  may ne 500 and save it to some intermediate working directory .
Finally read all the files from this working dir and then coalesce as 1 and save to final location.

Thanks
Deepak

On Fri, Mar 9, 2018, 20:12 Vadim Semenov <[hidden email]> wrote:
because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <[hidden email]> wrote:
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" target="_blank" rel="noreferrer">+49 241 80-21527




--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Vadim Semenov-2
You can use `.checkpoint` for that

`df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have only one partition, so sorting will take a lot of time

`df.sort(…).repartition(1).write...` — `repartition` will add an explicit stage, but sorting will be lost, since it's a repartition

```
sc.setCheckpointDir("/tmp/test")
val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all partitions
checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed partitions in one task, concatenate them, and will write them out as a single file
```

On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma <[hidden email]> wrote:
I would suggest repartioning it to reasonable partitions  may ne 500 and save it to some intermediate working directory .
Finally read all the files from this working dir and then coalesce as 1 and save to final location.

Thanks
Deepak

On Fri, Mar 9, 2018, 20:12 Vadim Semenov <[hidden email]> wrote:
because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <[hidden email]> wrote:
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" rel="noreferrer" target="_blank">+49 241 80-21527




--
Sent from my iPhone



--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Silvio Fiorito
In reply to this post by Vadim Semenov-2

Given you start with ~250MB but end up with 58GB seems like you’re generating quite a bit of data.

 

Whether you use coalesce or repartition, still writing out 58GB with one core is going to take a while.

 

Using Spark to do pre-processing but output a single file is not going to be very efficient since you’re asking Spark to limit its parallelization even if just the final stage to write data out.

 

What are you using downstream to read this file and why does it need to be a single 58GB file? Could you simply keep it in Spark to keep the pipeline optimized and avoid the data persistence step? For example, if you’re using R or Python to do some downstream processing you could just make that part of your pipeline vs writing it out and then reading it back in from another system.

 

 

From: Vadim Semenov <[hidden email]>
Date: Friday, March 9, 2018 at 9:42 AM
To: "Md. Rezaul Karim" <[hidden email]>
Cc: spark users <[hidden email]>
Subject: Re: Writing a DataFrame is taking too long and huge space

 

because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task.

 

You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)`

 

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <[hidden email]> wrote:

Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 


Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")

 

Any better suggestion?




----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel:
<a href="tel:&#43;49%20241%208021527" target="_blank">+49 241 80-21527



 

--

Sent from my iPhone

Reply | Threaded
Open this post in threaded view
|

Re: Writing a DataFrame is taking too long and huge space

Vadim Semenov-2
In reply to this post by Vadim Semenov-2
But overall, I think the original approach is not correct.
If you get a single file in 10s GB, the approach is probably must be reworked.

I don't see why you can't just write multiple CSV files using Spark, and then concatenate them without Spark

On Fri, Mar 9, 2018 at 10:02 AM, Vadim Semenov <[hidden email]> wrote:
You can use `.checkpoint` for that

`df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have only one partition, so sorting will take a lot of time

`df.sort(…).repartition(1).write...` — `repartition` will add an explicit stage, but sorting will be lost, since it's a repartition

```
sc.setCheckpointDir("/tmp/test")
val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all partitions
checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed partitions in one task, concatenate them, and will write them out as a single file
```

On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma <[hidden email]> wrote:
I would suggest repartioning it to reasonable partitions  may ne 500 and save it to some intermediate working directory .
Finally read all the files from this working dir and then coalesce as 1 and save to final location.

Thanks
Deepak

On Fri, Mar 9, 2018, 20:12 Vadim Semenov <[hidden email]> wrote:
because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <[hidden email]> wrote:
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is taking too long, which is about 4 to 5 hours. Nevertheless, the size of the file written on the disk is about 58GB! 

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?



----
Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: [hidden email]
Tel: <a href="tel:+49%20241%208021527" value="+492418021527" rel="noreferrer" target="_blank">+49 241 80-21527




--
Sent from my iPhone



--
Sent from my iPhone



--
Sent from my iPhone