spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Tobey <adam.to...@datadoghq.com.INVALID>
Subject [Debug] [Spark Core 2.4.4] org.apache.spark.storage.BlockException: Negative block size -9223372036854775808
Date Mon, 29 Jun 2020 21:05:17 GMT
Hi,

I'm encountering a strange exception in spark 2.4.4 (on AWS EMR 5.29):
org.apache.spark.storage.BlockException: Negative block size
-9223372036854775808.
I've seen this mostly from this line (for remote blocks)
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$splitLocalRemoteBlocks$3.apply(ShuffleBlockFetcherIterator.scala:295)
But also from this line (for local blocks)
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$splitLocalRemoteBlocks$3.apply(ShuffleBlockFetcherIterator.scala:281)
The block size of -9223372036854775808 (Long.MinValue) is the same every
time.

I only see this exception coming from a single physical node in every
cluster (i3.16xlarge EC2 instance hosting multiple executors), but it
affects multiple executors across separate jobs running on this node over
relatively long periods of time (e.g. 1+ hours) and outlives the first
executors that encounter the exception. This has happened on multiple EMR
clusters. We have dynamic allocation enabled, so it could be related
somehow to the external shuffle service, which would continue running
across these jobs. We am also using Kryo as the serializer.

This exception occurs in multiple stages, but all these stages are reading
shuffle output from a single stage with 15,000 partitions. When this
exception occurs, the job does not fail, but it loses shuffle data between
stages (the number of shuffle records written from upstream stages is
slightly more than the number read) and the job output becomes corrupted.
Re-running the job on a new cluster produces correct output as long as this
exception is never thrown.

>From reading the code, it seems to me the only possible way to have
Long.MinValue as a block size is from the avgSize of a
HighlyCompressedMapStatus since the size compression scheme of taking log
base 1.1 cannot produce a negative size (negative inputs map to 1:
https://github.com/apache/spark/blob/v2.4.4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L75-L95).
I don't see how the average computation itself can output Long.MinValue due
to the size checks above, even in case of overflow (
https://github.com/apache/spark/blob/v2.4.4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L203-L240
).

Does anyone have ideas as to how this block size of Long.MinValue is
possible? Thanks!

Mime
View raw message