spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Understanding State Store storage behavior for the Stream Deduplication function
Date Tue, 26 Mar 2019 00:00:16 GMT
Thanks Arun,

Exploring query progress makes sense to understand the internal behavior.
I'm more interested in resource usage.
We recently faced recurring issues with jobs running out of storage. It
turned out that is was due to the large amount of small files that the
state store produces combined with a slightly increased block size [1].
It was using several Gb of disk space of a PVC mounted on GlusterFS to keep
the state of two counts (good/bad samples)

In the new world of Kubernetes, where resources are explicitly requested +
assigned, stable storage becomes very important.
(I'm wondering how the state store performs on HDFS and its huge block
sizes)

Going back to the scenario exposed here, this job will eventually crash
because the disk usage keeps increasing.
In theory, watermarks should prevent that, and I'm wondering why this is
not the case.

Any ideas? Or should I open a Jira already?

kr, Gerard.




On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <arunm@apache.org> wrote:

> The disk usage of the state folder might not be the right one to look at.
> The space usage would be dominated by the number of delta files and the
> removes are written out as tombstone records. You might not see the
> difference until the files are compacted.
>
> You should instead look at the state operator metrics in the query
> progress.
>
> E.g. query.lastProgress
> ..
>   "stateOperators" : [ {
>     "numRowsTotal" : ...,
>     "numRowsUpdated" : ..,
>     "memoryUsedBytes" : ...
>   } ],
> ...
>
> Thanks,
> Arun
>
>
> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <gerard.maas@gmail.com> wrote:
>
>> Sparkers,
>>
>> I'm trying to understand the clean-up behavior of the state store for the
>> case of stream deduplication.
>>
>> I'm running a simple test with an unbounded stream and running
>> deduplication on it with and without watermark.
>> While my expectation is that the version with watermark should show a
>> bounded disk usage after the watermark time is reached, it turns out that
>> its disk demand keeps increasing. Even worse, the disk usage is larger than
>> the unbounded version.
>>
>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>> for the /state folder created by the query execution.
>>
>> [image: image.png]
>> The code for the test: (executed in spark-shell, in local mode)
>>
>> // with watermark
>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>> val stream = spark.readStream.format("rate").load()
>> val dedup = stream.withWatermark("timestamp", "5
>> minutes").dropDuplicates("value", "timestamp")
>> val query = dedup.writeStream.format("json").option("path",
>> "/tmp/spark/results").start
>>
>> // without watermark
>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>> val stream = spark.readStream.format("rate").load()
>> val dedup = stream.dropDuplicates("value", "timestamp")
>> val query = dedup.writeStream.format("json").option("path",
>> "/tmp/spark/results").start
>>
>> What could be going on here?
>>
>> Thanks,  Gerard.
>>
>>
>

Mime
View raw message