How many batches you ran for your test? Spark keeps 100 latest batches by default. I would expect that the disk usage of distributed file system will be stable after 100 batches as long as the state store size is stable.

On Mon, Mar 25, 2019 at 5:00 PM Gerard Maas <gerard.maas@gmail.com> wrote:
Thanks Arun,

Exploring query progress makes sense to understand the internal behavior. I'm more interested in resource usage. 
We recently faced recurring issues with jobs running out of storage. It turned out that is was due to the large amount of small files that the state store produces combined with a slightly increased block size [1].
It was using several Gb of disk space of a PVC mounted on GlusterFS to keep the state of two counts (good/bad samples)

In the new world of Kubernetes, where resources are explicitly requested + assigned, stable storage becomes very important.
(I'm wondering how the state store performs on HDFS and its huge block sizes)

Going back to the scenario exposed here, this job will eventually crash because the disk usage keeps increasing. 
In theory, watermarks should prevent that, and I'm wondering why this is not the case.

Any ideas? Or should I open a Jira already?

kr, Gerard.




On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <arunm@apache.org> wrote:
The disk usage of the state folder might not be the right one to look at. The space usage would be dominated by the number of delta files and the removes are written out as tombstone records. You might not see the difference until the files are compacted.

You should instead look at the state operator metrics in the query progress.

E.g. query.lastProgress
..
  "stateOperators" : [ {
    "numRowsTotal" : ...,
    "numRowsUpdated" : ..,
    "memoryUsedBytes" : ...
  } ],
...

Thanks,
Arun


On Mon, 25 Mar 2019 at 12:18, Gerard Maas <gerard.maas@gmail.com> wrote:
Sparkers,

I'm trying to understand the clean-up behavior of the state store for the case of stream deduplication.

I'm running a simple test with an unbounded stream and running deduplication on it with and without watermark.
While my expectation is that the version with watermark should show a bounded disk usage after the watermark time is reached, it turns out that its disk demand keeps increasing. Even worse, the disk usage is larger than the unbounded version.

This test ran for 8hrs, taking a sample of disk usage at OS level (du) for the /state folder created by the query execution.

image.png
The code for the test: (executed in spark-shell, in local mode)

// with watermark
spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
spark.conf.set("spark.sql.shuffle.partitions", "10")
val stream = spark.readStream.format("rate").load()
val dedup = stream.withWatermark("timestamp", "5 minutes").dropDuplicates("value", "timestamp")
val query = dedup.writeStream.format("json").option("path", "/tmp/spark/results").start

// without watermark
spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
spark.conf.set("spark.sql.shuffle.partitions", "10")
val stream = spark.readStream.format("rate").load()
val dedup = stream.dropDuplicates("value", "timestamp")
val query = dedup.writeStream.format("json").option("path", "/tmp/spark/results").start

What could be going on here? 

Thanks,  Gerard.
 
--

Best Regards,

Ryan