spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Burak Yavuz <>
Subject Re: Structured Streaming: distinct (Spark 2.2)
Date Mon, 19 Mar 2018 19:18:03 GMT
I believe the docs are out of date regarding distinct. The behavior should
be as follows:

 - Distinct should be applied across triggers
 - In order to prevent the state from growing indefinitely, you need to add
a watermark
 - If you don't have a watermark, but your key space is small, that's also
 - If a record arrives and is not in the state, it will be outputted
 - If a record arrives and is in the state, it will be ignored
 - Once the watermark passes for a key, it will be dropped from state
 - If a record arrives late, i.e. after the watermark, it will be ignored


On Mon, Mar 19, 2018 at 12:04 PM, Geoff Von Allmen <>

> I see in the documentation that the distinct operation is not supported
> <>
> in Structured Streaming. That being said, I have noticed that you are able
> to successfully call distinct() on a data frame and it seems to perform
> the desired operation and doesn’t fail with the AnalysisException as
> expected. If I call it with a column name specified, then it will fail with
> AnalysisException.
> I am using Structured Streaming to read from a Kafka stream and my
> question (and concern) is that:
>    - The distinct operation is properly applied across the *current*
>    batch as read from Kafka, however, the distinct operation would not
>    apply across batches.
> I have tried the following:
>    - Started the streaming job to see my baseline data and left the job
>    streaming
>    - Created events in kafka that would increment my counts if distinct
>    was not performing as expected
>    - Results:
>       - Distinct still seems to be working over the entire data set even
>       as I add new data.
>       - As I add new data, I see spark process the data (I’m doing output
>       mode = update) but there are no new results indicating the distinct
>       function is in fact still working across batches as spark pulls in the new
>       data from kafka.
> Does anyone know more about the intended behavior of distinct in
> Structured Streaming?
> If this is working as intended, does this mean I could have a dataset that
> is growing without bound being held in memory/disk or something to that
> effect (so it has some way to make that distinct operation against previous
> data)?
> ​

View raw message