spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Understanding State Store storage behavior for the Stream Deduplication function
Date Mon, 01 Apr 2019 16:47:49 GMT
Hi Ryan,

I ran the test overnight for roughly 8hrs. Using the rate source with
default values and the 'best effort' trigger, you get a batch/second, so it
ran for ~28k batches.
As you can see in the charts, the disk storage keeps on increasing.

I had to drop this issue last week as I had some priority tasks to take
care of. I'm going to run these tests again on a cluster to reproduce on a
different system and compare results.

regards, Gerard.



On Mon, Apr 1, 2019 at 6:20 PM Shixiong(Ryan) Zhu <shixiong@databricks.com>
wrote:

> How many batches you ran for your test? Spark keeps 100 latest batches by
> default. I would expect that the disk usage of distributed file system will
> be stable after 100 batches as long as the state store size is stable.
>
> On Mon, Mar 25, 2019 at 5:00 PM Gerard Maas <gerard.maas@gmail.com> wrote:
>
>> Thanks Arun,
>>
>> Exploring query progress makes sense to understand the internal behavior.
>> I'm more interested in resource usage.
>> We recently faced recurring issues with jobs running out of storage. It
>> turned out that is was due to the large amount of small files that the
>> state store produces combined with a slightly increased block size [1].
>> It was using several Gb of disk space of a PVC mounted on GlusterFS to
>> keep the state of two counts (good/bad samples)
>>
>> In the new world of Kubernetes, where resources are explicitly requested
>> + assigned, stable storage becomes very important.
>> (I'm wondering how the state store performs on HDFS and its huge block
>> sizes)
>>
>> Going back to the scenario exposed here, this job will eventually crash
>> because the disk usage keeps increasing.
>> In theory, watermarks should prevent that, and I'm wondering why this is
>> not the case.
>>
>> Any ideas? Or should I open a Jira already?
>>
>> kr, Gerard.
>>
>>
>>
>>
>> On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <arunm@apache.org> wrote:
>>
>>> The disk usage of the state folder might not be the right one to look
>>> at. The space usage would be dominated by the number of delta files and the
>>> removes are written out as tombstone records. You might not see the
>>> difference until the files are compacted.
>>>
>>> You should instead look at the state operator metrics in the query
>>> progress.
>>>
>>> E.g. query.lastProgress
>>> ..
>>>   "stateOperators" : [ {
>>>     "numRowsTotal" : ...,
>>>     "numRowsUpdated" : ..,
>>>     "memoryUsedBytes" : ...
>>>   } ],
>>> ...
>>>
>>> Thanks,
>>> Arun
>>>
>>>
>>> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <gerard.maas@gmail.com> wrote:
>>>
>>>> Sparkers,
>>>>
>>>> I'm trying to understand the clean-up behavior of the state store for
>>>> the case of stream deduplication.
>>>>
>>>> I'm running a simple test with an unbounded stream and running
>>>> deduplication on it with and without watermark.
>>>> While my expectation is that the version with watermark should show a
>>>> bounded disk usage after the watermark time is reached, it turns out that
>>>> its disk demand keeps increasing. Even worse, the disk usage is larger than
>>>> the unbounded version.
>>>>
>>>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>>>> for the /state folder created by the query execution.
>>>>
>>>> [image: image.png]
>>>> The code for the test: (executed in spark-shell, in local mode)
>>>>
>>>> // with watermark
>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>> val stream = spark.readStream.format("rate").load()
>>>> val dedup = stream.withWatermark("timestamp", "5
>>>> minutes").dropDuplicates("value", "timestamp")
>>>> val query = dedup.writeStream.format("json").option("path",
>>>> "/tmp/spark/results").start
>>>>
>>>> // without watermark
>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>> val stream = spark.readStream.format("rate").load()
>>>> val dedup = stream.dropDuplicates("value", "timestamp")
>>>> val query = dedup.writeStream.format("json").option("path",
>>>> "/tmp/spark/results").start
>>>>
>>>> What could be going on here?
>>>>
>>>> Thanks,  Gerard.
>>>>
>>>>
>>> --
>
> Best Regards,
> Ryan
>

Mime
View raw message