spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nirmal Manoharan <nirmalmano....@gmail.com>
Subject Spark Structured streaming - dropDuplicates with watermark
Date Tue, 04 Dec 2018 05:00:23 GMT
I am trying to deduplicate on streaming data using the dropDuplicate
function with watermark. The problem I am facing currently is that I have
to two timestamps for a given record
1. One is the eventtimestamp - timestamp of the record creation from the
source
2. Another is an transfer timestamp - timestamp from an intermediate
process that is responsible to stream the data.
The duplicates are introduced during the intermediate stage so for a given
a record duplicate, the eventtimestamp is same but transfer timestamp is
different.

For the watermark, I like to use the transfertimestamp because I know the
duplicates cant occur more than 3 minutes apart in transfer. But I cant use
it within dropDuplicate because it wont capture the duplicates as the
duplicates have different transfer timestamp.

Here is an example,
       * Event 1*:{ "EventString":"example1", "Eventtimestamp":
"2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" }
        *Event 2 (duplicate)*: {"EventString":"example1", "Eventtimestamp":
"2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"}

In this case, the duplicate was created during transfer after 3 mins from
the original event

My code is like below,

streamDataset.
    .withWatermark("transferTimestamp", "4 minutes")
    .dropDuplicates("eventstring","transferTimestamp");

The above code won't drop the duplicates as transferTimestamp is unique for
the event and its duplicate. But currently, this is the only way as Spark
forces me to include the watermark column in the dropDuplicates function.

I would really like to see an dropDuplicate implementation like below which
would be a valid case for any at-least once semantics streams where I dont
have to use the watermark field in dropDuplicates and still the watermark
based state eviction is honored.
streamDataset.
    .withWatermark("transferTimestamp", "4 minutes")
    .dropDuplicates("eventstring");

If anyone has an alternate solution for this, please let me know. I cant
use the eventtimestamp as it is not ordered and time range varies
drastically (delayed events and junk events).

Thanks in advance
-Nirmal

Mime
View raw message