spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Mahadevan <ar...@apache.org>
Subject Re: Understanding State Store storage behavior for the Stream Deduplication function
Date Mon, 25 Mar 2019 22:13:26 GMT
The disk usage of the state folder might not be the right one to look at.
The space usage would be dominated by the number of delta files and the
removes are written out as tombstone records. You might not see the
difference until the files are compacted.

You should instead look at the state operator metrics in the query progress.

E.g. query.lastProgress
..
  "stateOperators" : [ {
    "numRowsTotal" : ...,
    "numRowsUpdated" : ..,
    "memoryUsedBytes" : ...
  } ],
...

Thanks,
Arun


On Mon, 25 Mar 2019 at 12:18, Gerard Maas <gerard.maas@gmail.com> wrote:

> Sparkers,
>
> I'm trying to understand the clean-up behavior of the state store for the
> case of stream deduplication.
>
> I'm running a simple test with an unbounded stream and running
> deduplication on it with and without watermark.
> While my expectation is that the version with watermark should show a
> bounded disk usage after the watermark time is reached, it turns out that
> its disk demand keeps increasing. Even worse, the disk usage is larger than
> the unbounded version.
>
> This test ran for 8hrs, taking a sample of disk usage at OS level (du) for
> the /state folder created by the query execution.
>
> [image: image.png]
> The code for the test: (executed in spark-shell, in local mode)
>
> // with watermark
> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
> spark.conf.set("spark.sql.shuffle.partitions", "10")
> val stream = spark.readStream.format("rate").load()
> val dedup = stream.withWatermark("timestamp", "5
> minutes").dropDuplicates("value", "timestamp")
> val query = dedup.writeStream.format("json").option("path",
> "/tmp/spark/results").start
>
> // without watermark
> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
> spark.conf.set("spark.sql.shuffle.partitions", "10")
> val stream = spark.readStream.format("rate").load()
> val dedup = stream.dropDuplicates("value", "timestamp")
> val query = dedup.writeStream.format("json").option("path",
> "/tmp/spark/results").start
>
> What could be going on here?
>
> Thanks,  Gerard.
>
>

Mime
View raw message