spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GmailLiang <>
Subject Unsubscribe
Date Tue, 04 Dec 2018 12:41:37 GMT

Sent from Tianchu(Alex) iPhone

On Dec 4, 2018, at 00:00, Nirmal Manoharan <> wrote:

I am trying to deduplicate on streaming data using the dropDuplicate function with watermark.
The problem I am facing currently is that I have to two timestamps for a given record
1. One is the eventtimestamp - timestamp of the record creation from the source
2. Another is an transfer timestamp - timestamp from an intermediate process that is responsible
to stream the data. 
The duplicates are introduced during the intermediate stage so for a given a record duplicate,
the eventtimestamp is same but transfer timestamp is different. 

For the watermark, I like to use the transfertimestamp because I know the duplicates cant
occur more than 3 minutes apart in transfer. But I cant use it within dropDuplicate because
it wont capture the duplicates as the duplicates have different transfer timestamp. 

Here is an example,
        Event 1:{ "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp":
"2018-11-29T10:05:00.00" }
        Event 2 (duplicate): {"EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00",
"TransferTimestamp": "2018-11-29T10:08:00.00"}

In this case, the duplicate was created during transfer after 3 mins from the original event

My code is like below,
    .withWatermark("transferTimestamp", "4 minutes")

The above code won't drop the duplicates as transferTimestamp is unique for the event and
its duplicate. But currently, this is the only way as Spark forces me to include the watermark
column in the dropDuplicates function. 

I would really like to see an dropDuplicate implementation like below which would be a valid
case for any at-least once semantics streams where I dont have to use the watermark field
in dropDuplicates and still the watermark based state eviction is honored. 
    .withWatermark("transferTimestamp", "4 minutes")

If anyone has an alternate solution for this, please let me know. I cant use the eventtimestamp
as it is not ordered and time range varies drastically (delayed events and junk events). 

Thanks in advance

View raw message