Out of memory HDFS Multiple Cluster Write

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Out of memory HDFS Multiple Cluster Write

Ruijing Li
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Multiple Cluster Write

Chris Teoh
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Multiple Cluster Write

Ruijing Li
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Multiple Cluster Write

Chris Teoh
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Multiple Cluster Write

Ruijing Li
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Out of memory HDFS Multiple Cluster Write

Ruijing Li
I managed to make the failing stage work by increasing memoryOverhead to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped spark.mesos.executor.memoryOverhead=8G

Can someone explain why this solved the issue? As I understand, usage of memoryOverhead is for VM overhead and non heap items, which a simple read and write should not use (albeit to different hadoop clusters, but network should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before failing stage of multiple cluster write) to prevent spark’s small files problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <[hidden email]> wrote:
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Multiple Cluster Write

Chris Teoh
I'm not entirely sure what the behaviour is when writing to remote cluster. It could be that the connections are being established for every element in your dataframe, perhaps having to use for each partition may reduce the number of connections? You may have to look at what the executors do when they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <[hidden email]> wrote:
I managed to make the failing stage work by increasing memoryOverhead to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped spark.mesos.executor.memoryOverhead=8G

Can someone explain why this solved the issue? As I understand, usage of memoryOverhead is for VM overhead and non heap items, which a simple read and write should not use (albeit to different hadoop clusters, but network should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before failing stage of multiple cluster write) to prevent spark’s small files problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <[hidden email]> wrote:
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Read and Write

Ruijing Li
I was experimenting and found something interesting. I have executor OOM even if I don’t write to remote clusters. So it is purely a dataframe read and write issue
—————————————————————
To recap, I have an ETL data pipeline that does some logic, repartitions to reduce the amount of files written, writes the output to HDFS as parquet files. After, it reads the output and writes it to other locations, doesn’t matter if on the same hadoop cluster or multiple. This is a simple piece of code
```
destPaths.foreach(path => Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path)) match {
//log failure or success 
}
```
However this stage - read from sourceOutput and write to different locations - is failing in Spark, despite all other stages succeeding, including the heavy duty logic. And the data is not too big to handle for spark.

Only bumping memoryOverhead, and also repartitioning output to more partitions, 40 precisely (when it failed, we partitioned the output to 20 after logic is finished but before writing to HDFS) have made the read&write stage succeed.

Not understanding how spark read&write stage can experience OOM issues. Hoping to shed some light on why.

On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh <[hidden email]> wrote:
I'm not entirely sure what the behaviour is when writing to remote cluster. It could be that the connections are being established for every element in your dataframe, perhaps having to use for each partition may reduce the number of connections? You may have to look at what the executors do when they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <[hidden email]> wrote:
I managed to make the failing stage work by increasing memoryOverhead to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped spark.mesos.executor.memoryOverhead=8G

Can someone explain why this solved the issue? As I understand, usage of memoryOverhead is for VM overhead and non heap items, which a simple read and write should not use (albeit to different hadoop clusters, but network should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before failing stage of multiple cluster write) to prevent spark’s small files problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <[hidden email]> wrote:
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Read and Write

Chris Teoh
Does it work for just a single path input and single output?

Is the destPath a collection that is sitting on the driver?

On Sun, 22 Dec 2019, 7:59 pm Ruijing Li, <[hidden email]> wrote:
I was experimenting and found something interesting. I have executor OOM even if I don’t write to remote clusters. So it is purely a dataframe read and write issue
—————————————————————
To recap, I have an ETL data pipeline that does some logic, repartitions to reduce the amount of files written, writes the output to HDFS as parquet files. After, it reads the output and writes it to other locations, doesn’t matter if on the same hadoop cluster or multiple. This is a simple piece of code
```
destPaths.foreach(path => Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path)) match {
//log failure or success 
}
```
However this stage - read from sourceOutput and write to different locations - is failing in Spark, despite all other stages succeeding, including the heavy duty logic. And the data is not too big to handle for spark.

Only bumping memoryOverhead, and also repartitioning output to more partitions, 40 precisely (when it failed, we partitioned the output to 20 after logic is finished but before writing to HDFS) have made the read&write stage succeed.

Not understanding how spark read&write stage can experience OOM issues. Hoping to shed some light on why.

On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh <[hidden email]> wrote:
I'm not entirely sure what the behaviour is when writing to remote cluster. It could be that the connections are being established for every element in your dataframe, perhaps having to use for each partition may reduce the number of connections? You may have to look at what the executors do when they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <[hidden email]> wrote:
I managed to make the failing stage work by increasing memoryOverhead to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped spark.mesos.executor.memoryOverhead=8G

Can someone explain why this solved the issue? As I understand, usage of memoryOverhead is for VM overhead and non heap items, which a simple read and write should not use (albeit to different hadoop clusters, but network should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before failing stage of multiple cluster write) to prevent spark’s small files problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <[hidden email]> wrote:
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Read and Write

Ruijing Li
In reply to this post by Ruijing Li
@Chris destPaths is just a Seq[String] that holds the paths we wish to copy the output to. Even if the collection only holds one path, it does not work. However, the job runs fine if we don’t copy the output. The pipeline succeeds in read input -> perform logic as dataframe -> write output. As for your second question, I’m not sure how spark handles it, do the executors come back to the driver to read or they have their own copy? I don’t see any driver issues, but I will try experimenting on making that Seq into a Dataset[String] instead if it helps.

@Sumedh That issue seems interesting to me. I need to dive into it further. From a quick glance, that issue describes large parquet files, but our data is rather small. Additionally, as described above, our pipeline can run fine with given resources if it read input -> perform logic as dataframe -> write output, but fails on additional reads&writes. It seems the jira describes our job should fail or see issues at the start. Lastly, I found increasing off-heap helped more than increasing heap size for executor (executor.memoryOverhead vs executor.memory) but we use spark 2.3.

On Sun, Dec 22, 2019 at 7:44 AM Sumedh Wale <[hidden email]> wrote:
Parquet reads in Spark need lots of tempory heap memory due to ColumnVectors and write block size. See a similar issue:  https://jira.snappydata.io/browse/SNAP-3111

In addition writes too consume significant amount of heap due to parquet.block.size. One solution is to reduce the spark.executor.cores in such a job (note the approx heap calculation noted in the ticket). Other solution is increased executor heap. Or use off-heap configuration with Spark 2.4 which will remove the pressure for reads but not for writes.

regards
sumedh

On Sun, 22 Dec, 2019, 14:29 Ruijing Li, <[hidden email]> wrote:
I was experimenting and found something interesting. I have executor OOM even if I don’t write to remote clusters. So it is purely a dataframe read and write issue
—————————————————————
To recap, I have an ETL data pipeline that does some logic, repartitions to reduce the amount of files written, writes the output to HDFS as parquet files. After, it reads the output and writes it to other locations, doesn’t matter if on the same hadoop cluster or multiple. This is a simple piece of code
```
destPaths.foreach(path => Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path)) match {
//log failure or success 
}
```
However this stage - read from sourceOutput and write to different locations - is failing in Spark, despite all other stages succeeding, including the heavy duty logic. And the data is not too big to handle for spark.

Only bumping memoryOverhead, and also repartitioning output to more partitions, 40 precisely (when it failed, we partitioned the output to 20 after logic is finished but before writing to HDFS) have made the read&write stage succeed.

Not understanding how spark read&write stage can experience OOM issues. Hoping to shed some light on why.

On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh <[hidden email]> wrote:
I'm not entirely sure what the behaviour is when writing to remote cluster. It could be that the connections are being established for every element in your dataframe, perhaps having to use for each partition may reduce the number of connections? You may have to look at what the executors do when they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <[hidden email]> wrote:
I managed to make the failing stage work by increasing memoryOverhead to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped spark.mesos.executor.memoryOverhead=8G

Can someone explain why this solved the issue? As I understand, usage of memoryOverhead is for VM overhead and non heap items, which a simple read and write should not use (albeit to different hadoop clusters, but network should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before failing stage of multiple cluster write) to prevent spark’s small files problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <[hidden email]> wrote:
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Out of memory HDFS Read and Write

Sumedh Wale-2
In reply to this post by Ruijing Li
Parquet reads in Spark need lots of tempory heap memory due to ColumnVectors and write block size. See a similar issue:  https://jira.snappydata.io/browse/SNAP-3111

In addition writes too consume significant amount of heap due to parquet.block.size. One solution is to reduce the spark.executor.cores in such a job (note the approx heap calculation noted in the ticket) to reduce concurrent use and also reduce spark.sql.files.maxPartitionBytes/parquet.block.size to reduce overhead of reads and writes. Other solution is increasing executor heap. Or use off-heap configuration with Spark 2.4 which will remove the pressure for reads but not for writes.

regards
sumedh

On Sun, 22 Dec, 2019, 14:29 Ruijing Li, <[hidden email]> wrote:
I was experimenting and found something interesting. I have executor OOM even if I don’t write to remote clusters. So it is purely a dataframe read and write issue
—————————————————————
To recap, I have an ETL data pipeline that does some logic, repartitions to reduce the amount of files written, writes the output to HDFS as parquet files. After, it reads the output and writes it to other locations, doesn’t matter if on the same hadoop cluster or multiple. This is a simple piece of code
```
destPaths.foreach(path => Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path)) match {
//log failure or success 
}
```
However this stage - read from sourceOutput and write to different locations - is failing in Spark, despite all other stages succeeding, including the heavy duty logic. And the data is not too big to handle for spark.

Only bumping memoryOverhead, and also repartitioning output to more partitions, 40 precisely (when it failed, we partitioned the output to 20 after logic is finished but before writing to HDFS) have made the read&write stage succeed.

Not understanding how spark read&write stage can experience OOM issues. Hoping to shed some light on why.

On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh <[hidden email]> wrote:
I'm not entirely sure what the behaviour is when writing to remote cluster. It could be that the connections are being established for every element in your dataframe, perhaps having to use for each partition may reduce the number of connections? You may have to look at what the executors do when they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <[hidden email]> wrote:
I managed to make the failing stage work by increasing memoryOverhead to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped spark.mesos.executor.memoryOverhead=8G

Can someone explain why this solved the issue? As I understand, usage of memoryOverhead is for VM overhead and non heap items, which a simple read and write should not use (albeit to different hadoop clusters, but network should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before failing stage of multiple cluster write) to prevent spark’s small files problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <[hidden email]> wrote:
Not for the stage that fails, all it does is read and write - the number of tasks is # of cores * # of executor instances. For us that is 60 (3 cores 20 executors)

The input partition size for the failing stage, when spark reads the 20 files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <[hidden email]> wrote:
If you're using Spark SQL, that configuration setting causes a shuffle if the number of your input partitions to the write is larger than that configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a shuffle? I don't expect a shuffle if it is a straight write. What's the input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <[hidden email]> wrote:
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is complete, I repartition the files to 20 after having spark.sql.shuffle.partitions = 2000 so we don’t have too many small files. Data is small about 130MB per file. When spark reads it reads in 40 partitions and tries to output that to the different cluster. Unfortunately during that read and write stage executors drop off. 

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <[hidden email]> wrote:
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS location as parquet then reads the files back in and writes to multiple hadoop clusters (all co-located in the same datacenter).  It should be a very simple task, but executors are being killed off exceeding container thresholds. From logs, it is exceeding given memory (using Mesos as the cluster manager).

The ETL process works perfectly fine with the given resources, doing joins and adding columns. The output is written successfully the first time. Only when the pipeline at the end reads the output from HDFS and writes it to different HDFS cluster paths does it fail. (It does a spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I should start looking at. 

-- 
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li