Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

harelglik
Hi,

We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.

It processes ~40 TB of data using aggregateByKey in which we specify numPartitions = 300,000.
Map side tasks succeed, but reduce side tasks all fail.

We notice the following driver error:
18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
 java.lang.OutOfMemoryError
	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
	at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
	at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
	at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
	at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
	at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
	at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
	at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
	at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
	at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
	at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
	at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
	at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
	at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.lang.OutOfMemoryError
		at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
		at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
		at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
		at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
		at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
		at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
		at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
		at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
		at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
		at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
		at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
		at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
		at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
		... 6 more
We found this reference (https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that was closed in 2016.
Please advise,
Harel.

Reply | Threaded
Open this post in threaded view
|

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Vadim Semenov-2
You have too many partitions, so when the driver is trying to gather
the status of all map outputs and send back to executors it chokes on
the size of the structure that needs to be GZipped, and since it's
bigger than 2GiB, it produces OOM.
On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <[hidden email]> wrote:

>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> ... 6 more
>
> We found this reference (https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that was closed in 2016.
>
> Please advise,
>
> Harel.
>
>


--
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

harelglik
I understand the error is because the number of partitions is very high, yet when processing 40 TB (and this number is expected to grow) this number seems reasonable:
40TB / 300,000 will result in partitions size of ~ 130MB (data should be evenly distributed).

On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov <[hidden email]> wrote:
You have too many partitions, so when the driver is trying to gather
the status of all map outputs and send back to executors it chokes on
the size of the structure that needs to be GZipped, and since it's
bigger than 2GiB, it produces OOM.
On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <[hidden email]> wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> ... 6 more
>
> We found this reference (https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that was closed in 2016.
>
> Please advise,
>
> Harel.
>
>


--
Sent from my iPhone