spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Mahadevan <ar...@apache.org>
Subject Re: Understanding State Store storage behavior for the Stream Deduplication function
Date Mon, 01 Apr 2019 17:51:27 GMT
Check the number of rows in your state (from query.lastProgress) and see if
thats stabilizing after some time. Also look inside the "state" folder of
the checkpoint directory to see if the old delta files are getting cleaned
up (you should see roughly around 100 delta files). If both are happening
the state disk usage should not increase. If not it might be a bug.

Thanks,
Arun



On Mon, 1 Apr 2019 at 09:48, Gerard Maas <gerard.maas@gmail.com> wrote:

> Hi Ryan,
>
> I ran the test overnight for roughly 8hrs. Using the rate source with
> default values and the 'best effort' trigger, you get a batch/second, so it
> ran for ~28k batches.
> As you can see in the charts, the disk storage keeps on increasing.
>
> I had to drop this issue last week as I had some priority tasks to take
> care of. I'm going to run these tests again on a cluster to reproduce on a
> different system and compare results.
>
> regards, Gerard.
>
>
>
> On Mon, Apr 1, 2019 at 6:20 PM Shixiong(Ryan) Zhu <shixiong@databricks.com>
> wrote:
>
>> How many batches you ran for your test? Spark keeps 100 latest batches by
>> default. I would expect that the disk usage of distributed file system will
>> be stable after 100 batches as long as the state store size is stable.
>>
>> On Mon, Mar 25, 2019 at 5:00 PM Gerard Maas <gerard.maas@gmail.com>
>> wrote:
>>
>>> Thanks Arun,
>>>
>>> Exploring query progress makes sense to understand the internal
>>> behavior. I'm more interested in resource usage.
>>> We recently faced recurring issues with jobs running out of storage. It
>>> turned out that is was due to the large amount of small files that the
>>> state store produces combined with a slightly increased block size [1].
>>> It was using several Gb of disk space of a PVC mounted on GlusterFS to
>>> keep the state of two counts (good/bad samples)
>>>
>>> In the new world of Kubernetes, where resources are explicitly requested
>>> + assigned, stable storage becomes very important.
>>> (I'm wondering how the state store performs on HDFS and its huge block
>>> sizes)
>>>
>>> Going back to the scenario exposed here, this job will eventually crash
>>> because the disk usage keeps increasing.
>>> In theory, watermarks should prevent that, and I'm wondering why this is
>>> not the case.
>>>
>>> Any ideas? Or should I open a Jira already?
>>>
>>> kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Mon, Mar 25, 2019 at 11:13 PM Arun Mahadevan <arunm@apache.org>
>>> wrote:
>>>
>>>> The disk usage of the state folder might not be the right one to look
>>>> at. The space usage would be dominated by the number of delta files and the
>>>> removes are written out as tombstone records. You might not see the
>>>> difference until the files are compacted.
>>>>
>>>> You should instead look at the state operator metrics in the query
>>>> progress.
>>>>
>>>> E.g. query.lastProgress
>>>> ..
>>>>   "stateOperators" : [ {
>>>>     "numRowsTotal" : ...,
>>>>     "numRowsUpdated" : ..,
>>>>     "memoryUsedBytes" : ...
>>>>   } ],
>>>> ...
>>>>
>>>> Thanks,
>>>> Arun
>>>>
>>>>
>>>> On Mon, 25 Mar 2019 at 12:18, Gerard Maas <gerard.maas@gmail.com>
>>>> wrote:
>>>>
>>>>> Sparkers,
>>>>>
>>>>> I'm trying to understand the clean-up behavior of the state store for
>>>>> the case of stream deduplication.
>>>>>
>>>>> I'm running a simple test with an unbounded stream and running
>>>>> deduplication on it with and without watermark.
>>>>> While my expectation is that the version with watermark should show a
>>>>> bounded disk usage after the watermark time is reached, it turns out
that
>>>>> its disk demand keeps increasing. Even worse, the disk usage is larger
than
>>>>> the unbounded version.
>>>>>
>>>>> This test ran for 8hrs, taking a sample of disk usage at OS level (du)
>>>>> for the /state folder created by the query execution.
>>>>>
>>>>> [image: image.png]
>>>>> The code for the test: (executed in spark-shell, in local mode)
>>>>>
>>>>> // with watermark
>>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>>> val stream = spark.readStream.format("rate").load()
>>>>> val dedup = stream.withWatermark("timestamp", "5
>>>>> minutes").dropDuplicates("value", "timestamp")
>>>>> val query = dedup.writeStream.format("json").option("path",
>>>>> "/tmp/spark/results").start
>>>>>
>>>>> // without watermark
>>>>> spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/spark")
>>>>> spark.conf.set("spark.sql.shuffle.partitions", "10")
>>>>> val stream = spark.readStream.format("rate").load()
>>>>> val dedup = stream.dropDuplicates("value", "timestamp")
>>>>> val query = dedup.writeStream.format("json").option("path",
>>>>> "/tmp/spark/results").start
>>>>>
>>>>> What could be going on here?
>>>>>
>>>>> Thanks,  Gerard.
>>>>>
>>>>>
>>>> --
>>
>> Best Regards,
>> Ryan
>>
>

Mime
View raw message