Spark Executor OOMs when writing Parquet

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Executor OOMs when writing Parquet

Arwin Tio
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Chris Teoh
Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Screen Shot 2020-01-17 at 7.05.31 AM.png (544K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Arwin Tio
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK?


(If you're wondering, this is EMR).


From: Chris Teoh <[hidden email]>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio <[hidden email]>
Cc: user @spark <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Vadim Semenov-3
In reply to this post by Arwin Tio
Based on the error trace it's likely that the system doesn't have enough offheap memory, when Parquet is doing Snappy compression it uses native IO which works out of heap. You need to reduce the heap size which will give more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Chris Teoh
In reply to this post by Arwin Tio
You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, <[hidden email]> wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK?


(If you're wondering, this is EMR).


From: Chris Teoh <[hidden email]>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio <[hidden email]>
Cc: user @spark <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin

Screen Shot 2020-01-17 at 7.34.59 AM.png (318K) Download Attachment
Screen Shot 2020-01-17 at 7.34.59 AM.png (318K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Arwin Tio
Okay! I didn't realize you can pump those partition numbers up that high. 15000 partitions still failed. I am trying 30000 partitions now. There is still some disk spill but it is not that high.

Thanks,

Arwin


From: Chris Teoh <[hidden email]>
Sent: January 17, 2020 7:32 PM
To: Arwin Tio <[hidden email]>
Cc: user @spark <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, <[hidden email]> wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK?


(If you're wondering, this is EMR).


From: Chris Teoh <[hidden email]>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio <[hidden email]>
Cc: user @spark <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Arwin Tio
In reply to this post by Vadim Semenov-3
Hi Vadim,

Thank you for the help.  It seems that my executor JVMs has plenty of heap space, so off-heap may be the issue here:

Green: committed
Orange: used



Will simply reducing heap size do the trick?

What about the following config options from https://spark.apache.org/docs/latest/configuration.html

  • spark.executor.memoryOverhead
  • spark.memory.offHeap.enabled  
  • spark.memory.offHeap.size
I will experiment with these at once but curious to know your thoughts. Is the Snappy off-heap different to the Spark off-heap as allocated by "spark.executor.memoryOverhead"?

Thanks,

Arwin

From: Vadim Semenov <[hidden email]>
Sent: January 17, 2020 12:53 PM
To: Arwin Tio <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Based on the error trace it's likely that the system doesn't have enough offheap memory, when Parquet is doing Snappy compression it uses native IO which works out of heap. You need to reduce the heap size which will give more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Chris Teoh
In reply to this post by Arwin Tio
Yes. Disk spill can be a huge performance hit, with smaller partitions you may avoid this and possibly complete your job faster. I hope you don't get OOM.

On Sat, 18 Jan 2020 at 10:06, Arwin Tio <[hidden email]> wrote:
Okay! I didn't realize you can pump those partition numbers up that high. 15000 partitions still failed. I am trying 30000 partitions now. There is still some disk spill but it is not that high.

Thanks,

Arwin


From: Chris Teoh <[hidden email]>
Sent: January 17, 2020 7:32 PM
To: Arwin Tio <[hidden email]>
Cc: user @spark <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, <[hidden email]> wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK?


(If you're wondering, this is EMR).


From: Chris Teoh <[hidden email]>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio <[hidden email]>
Cc: user @spark <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


--
Chris
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Phillip Henry
In reply to this post by Arwin Tio
Hi, Arwin.

Did you establish that this was due to off-heap memory usage? If you're running on Linux, you could poll pmap PID to see how much off-heap memory is being used...

Phillip

On Sat, Jan 18, 2020 at 2:13 AM Arwin Tio <[hidden email]> wrote:
Hi Vadim,

Thank you for the help.  It seems that my executor JVMs has plenty of heap space, so off-heap may be the issue here:

Green: committed
Orange: used



Will simply reducing heap size do the trick?

What about the following config options from https://spark.apache.org/docs/latest/configuration.html

  • spark.executor.memoryOverhead
  • spark.memory.offHeap.enabled  
  • spark.memory.offHeap.size
I will experiment with these at once but curious to know your thoughts. Is the Snappy off-heap different to the Spark off-heap as allocated by "spark.executor.memoryOverhead"?

Thanks,

Arwin

From: Vadim Semenov <[hidden email]>
Sent: January 17, 2020 12:53 PM
To: Arwin Tio <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Based on the error trace it's likely that the system doesn't have enough offheap memory, when Parquet is doing Snappy compression it uses native IO which works out of heap. You need to reduce the heap size which will give more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Arwin Tio
Hi Phillip, All

Thanks for all the help. Empirically, increasing the amount of native memory by reducing heap size from 90% -> 80% solved the problem. 

I did not look into the usage breakdown of off-heap memory but I would imagine it's from Snappy's off-heap buffers, like Vadim said, or perhaps, other Spark off-heap internals? I figure whatever it may be, it's not really actionable other than making more memory available.

Other things I tried:

  • Tried setting a high number of "-XX:MaxDirectMemorySize" based on https://issues.apache.org/jira/browse/SPARK-4073. This was a red herring because it turns out the JVM will try to use up all available native memory by default when that option is not set.
  • Tried setting "spark.executor.memoryOverhead" to a higher number - but I got some complaints about exceeding the maximum YARN memory allocation. Actually it is unclear to me the relationship between memoryOverhead, the Spark JVM heap, off-heap, and YARN.

Anyways, it seems like all I had to do was reduce my heap size. Does anyone here monitor the direct memory/off-heap usage of their Spark executors? 

Cheers,

Arwin


From: Phillip Henry <[hidden email]>
Sent: January 21, 2020 6:18 AM
To: Arwin Tio <[hidden email]>
Cc: Vadim Semenov <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Hi, Arwin.

Did you establish that this was due to off-heap memory usage? If you're running on Linux, you could poll pmap PID to see how much off-heap memory is being used...

Phillip

On Sat, Jan 18, 2020 at 2:13 AM Arwin Tio <[hidden email]> wrote:
Hi Vadim,

Thank you for the help.  It seems that my executor JVMs has plenty of heap space, so off-heap may be the issue here:

Green: committed
Orange: used



Will simply reducing heap size do the trick?

What about the following config options from https://spark.apache.org/docs/latest/configuration.html

  • spark.executor.memoryOverhead
  • spark.memory.offHeap.enabled  
  • spark.memory.offHeap.size
I will experiment with these at once but curious to know your thoughts. Is the Snappy off-heap different to the Spark off-heap as allocated by "spark.executor.memoryOverhead"?

Thanks,

Arwin

From: Vadim Semenov <[hidden email]>
Sent: January 17, 2020 12:53 PM
To: Arwin Tio <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Based on the error trace it's likely that the system doesn't have enough offheap memory, when Parquet is doing Snappy compression it uses native IO which works out of heap. You need to reduce the heap size which will give more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Spark Executor OOMs when writing Parquet

Vadim Semenov-3
Does anyone here monitor the direct memory/off-heap usage of their Spark executors?

We at Datadog monitor whole process memory usage (so offheap is roughly process memory - heap used).

We forked etsy's statsd-jvm-profiler that reports heap metrics and added process memory usage

it then sends metrics via statsd to datadog and I think influxdb can be used

here's my talk about how we monitor memory and other stuff in spark

On Tue, Jan 21, 2020 at 8:35 AM Arwin Tio <[hidden email]> wrote:
Hi Phillip, All

Thanks for all the help. Empirically, increasing the amount of native memory by reducing heap size from 90% -> 80% solved the problem. 

I did not look into the usage breakdown of off-heap memory but I would imagine it's from Snappy's off-heap buffers, like Vadim said, or perhaps, other Spark off-heap internals? I figure whatever it may be, it's not really actionable other than making more memory available.

Other things I tried:

  • Tried setting a high number of "-XX:MaxDirectMemorySize" based on https://issues.apache.org/jira/browse/SPARK-4073. This was a red herring because it turns out the JVM will try to use up all available native memory by default when that option is not set.
  • Tried setting "spark.executor.memoryOverhead" to a higher number - but I got some complaints about exceeding the maximum YARN memory allocation. Actually it is unclear to me the relationship between memoryOverhead, the Spark JVM heap, off-heap, and YARN.

Anyways, it seems like all I had to do was reduce my heap size. Does anyone here monitor the direct memory/off-heap usage of their Spark executors? 

Cheers,

Arwin


From: Phillip Henry <[hidden email]>
Sent: January 21, 2020 6:18 AM
To: Arwin Tio <[hidden email]>
Cc: Vadim Semenov <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Hi, Arwin.

Did you establish that this was due to off-heap memory usage? If you're running on Linux, you could poll pmap PID to see how much off-heap memory is being used...

Phillip

On Sat, Jan 18, 2020 at 2:13 AM Arwin Tio <[hidden email]> wrote:
Hi Vadim,

Thank you for the help.  It seems that my executor JVMs has plenty of heap space, so off-heap may be the issue here:

Green: committed
Orange: used



Will simply reducing heap size do the trick?

What about the following config options from https://spark.apache.org/docs/latest/configuration.html

  • spark.executor.memoryOverhead
  • spark.memory.offHeap.enabled  
  • spark.memory.offHeap.size
I will experiment with these at once but curious to know your thoughts. Is the Snappy off-heap different to the Spark off-heap as allocated by "spark.executor.memoryOverhead"?

Thanks,

Arwin

From: Vadim Semenov <[hidden email]>
Sent: January 17, 2020 12:53 PM
To: Arwin Tio <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Spark Executor OOMs when writing Parquet
 
Based on the error trace it's likely that the system doesn't have enough offheap memory, when Parquet is doing Snappy compression it uses native IO which works out of heap. You need to reduce the heap size which will give more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <[hidden email]> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines (r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241". I am not sure how to interpret these stages:


Thanks,

Arwin


--
Sent from my iPhone


--
Sent from my iPhone