[Debug] [Spark Core 2.4.4] org.apache.spark.storage.BlockException: Negative block size -9223372036854775808

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[Debug] [Spark Core 2.4.4] org.apache.spark.storage.BlockException: Negative block size -9223372036854775808

Adam Tobey

I'm encountering a strange exception in spark 2.4.4 (on AWS EMR 5.29):
org.apache.spark.storage.BlockException: Negative block size -9223372036854775808.
I've seen this mostly from this line (for remote blocks) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$splitLocalRemoteBlocks$3.apply(ShuffleBlockFetcherIterator.scala:295)
But also from this line (for local blocks) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$splitLocalRemoteBlocks$3.apply(ShuffleBlockFetcherIterator.scala:281)
The block size of -9223372036854775808 (Long.MinValue) is the same every time.

I only see this exception coming from a single physical node in every cluster (i3.16xlarge EC2 instance hosting multiple executors), but it affects multiple executors across separate jobs running on this node over relatively long periods of time (e.g. 1+ hours) and outlives the first executors that encounter the exception. This has happened on multiple EMR clusters. We have dynamic allocation enabled, so it could be related somehow to the external shuffle service, which would continue running across these jobs. We am also using Kryo as the serializer.

This exception occurs in multiple stages, but all these stages are reading shuffle output from a single stage with 15,000 partitions. When this exception occurs, the job does not fail, but it loses shuffle data between stages (the number of shuffle records written from upstream stages is slightly more than the number read) and the job output becomes corrupted. Re-running the job on a new cluster produces correct output as long as this exception is never thrown.

From reading the code, it seems to me the only possible way to have Long.MinValue as a block size is from the avgSize of a HighlyCompressedMapStatus since the size compression scheme of taking log base 1.1 cannot produce a negative size (negative inputs map to 1: https://github.com/apache/spark/blob/v2.4.4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L75-L95). I don't see how the average computation itself can output Long.MinValue due to the size checks above, even in case of overflow (https://github.com/apache/spark/blob/v2.4.4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L203-L240).

Does anyone have ideas as to how this block size of Long.MinValue is possible? Thanks!