spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrew Korzhuev (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-23682) Memory issue with Spark structured streaming
Date Wed, 28 Mar 2018 14:55:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474
] 

Andrew Korzhuev commented on SPARK-23682:
-----------------------------------------

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_

 

_private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_

 

_,_ which appears not to clean up _UnsafeRow_s coming from:

 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload                       true
spark.hadoop.fs.s3a.fast.upload.buffer                disk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both number of objects
and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> --------------------------------------------
>
>                 Key: SPARK-23682
>                 URL: https://issues.apache.org/jira/browse/SPARK-23682
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled
-XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>            Reporter: Yuriy Bondaruk
>            Priority: Major
>              Labels: Memory, memory, memory-leak
>         Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 2018-03-10 at
18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png,
Screen Shot 2018-03-28 at 16.44.20.png, Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream with aggregation
(dropDuplicates()) and data partitioning constantly increases memory usage and finally executors
fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason:
Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal.
Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
>     .readStream()
>     .schema(inputSchema)
>     .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
>     .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
>     .csv("s3://test-bucket/input")
>     .as(Encoders.bean(TestRecord.class))
>     .flatMap(mf, Encoders.bean(TestRecord.class))
>     .dropDuplicates("testId", "testName")
>     .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType),
"YYYY"))
>     .writeStream()
>     .option("path", "s3://test-bucket/output")
>     .option("checkpointLocation", "s3://test-bucket/checkpoint")
>     .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
>     .partitionBy("year")
>     .format("parquet")
>     .outputMode(OutputMode.Append())
>     .queryName("test-stream")
>     .start();{quote}
> Analyzing the heap dump I found that most of the memory used by {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} that
is referenced from [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] 
> On the first glance it looks normal since that is how Spark keeps aggregation keys in
memory. However I did my testing by renaming files in source folder, so that they could be
picked up by spark again. Since input records are the same all further rows should be rejected
as duplicates and memory consumption shouldn't increase but it's not true. Moreover, GC time
took more than 30% of total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message