spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhijeet Kumar <abhijeet.ku...@sentienz.com>
Subject Re: Why does join use rows that were sent after watermark of 20 seconds?
Date Tue, 11 Dec 2018 05:52:09 GMT
You mean to say that Spark will store all the data in memory forever :)

> On 10-Dec-2018, at 6:16 PM, Sandeep Katta <sandeep0102.opensource@gmail.com> wrote:
> 
> Hi Abhijeet,
> 
> You are using inner join with unbounded state which means every data in stream ll match
with  other stream infinitely, 
>   If you want the intended behaviour you should add time stamp conditions or window operator
in join condition
> 
> 
> 
> On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar <abhijeet.kumar@sentienz.com <mailto:abhijeet.kumar@sentienz.com>>
wrote:
> Hello,
> 
> I’m using watermark to join two streams as you can see below:
> 
> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
> val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
> val join_df = order_wm
>   .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
> My understanding with the above code, it will keep each of the stream for 20 secs. After
it comes but, when I’m giving one stream now and the another after 20secs then also both
are getting joined. It seems like even after watermark got finished Spark is holding the data
in memory. I even tried after 45 seconds and that was getting joined too.
> 
> I’m sending streams from two Kafka queues and tstamp_trans I’m creating with current
timestamp values.
> This is creating confusion in my mind regarding watermark.
> 
> 
> Thank you,
> Abhijeet Kumar


Mime
View raw message