spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Ammann <...@pyx.ch>
Subject Spark structured streaming leftOuter join not working as I expect
Date Tue, 04 Jun 2019 12:31:11 GMT
Hi all

sorry, tl;dr

I'm on my first Python Spark structured streaming app, in the end joining messages from ~10
different Kafka topics. I've recently upgraded to Spark 2.4.3, which has resolved all my issues
with the time handling (watermarks, join windows) I had before with Spark 2.3.2.

My current problem happens during a leftOuter join, where messages from 3 topics are joined,
the results are then aggregated with a groupBy and finally put onto a result Kafka topic.
On the 3 input topics involved, all messages have ID and LAST_MOD fields. I use the ID for
joining, and the LAST_MOD as event timestamp on all incoming streams. Since the fields on
the incoming messages are all named the same (ID and LAST_MOD), I rename them on all incoming
streams with

     aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as A_LAST_MOD").drop(*["ID", "LAST_MOD"])

For those data frames, I then take the watermark with the A/B/C_LAST_MOD as event time, before
joining. I know that the LAST_MOD timestamps are equal on the messages that I want to join
together.

The first join is an inner join, where a field on stream A links with the ID of stream B.
So I have

     aDf
        .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
        .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
        .agg(
            collect_list(struct("*")).alias("RESULTS"),
            count("A_ID").alias("NUM_RESULTS"),
            # just add a timestamp to watermark on, they are all the
            min("A_LAST_MOD").alias("RESULT_LAST_MOD")
            )
        .withWatermark("RESULT_LAST_MOD", "30 seconds")
        )

This works perfectly and generates (on my current data set) some 10'000 records. This is the
expected result.

When I add the leftOuter join of the third topic as follows

     aDf
        .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
# here the additional left join
        -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) # C_FK is
the field in stream B
        .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
        .agg(
            collect_list(struct("*")).alias("RESULTS"),
            count("A_ID").alias("NUM_RESULTS"),
            # just add a timestamp to watermark on, they are all the
            min("A_LAST_MOD").alias("RESULT_LAST_MOD")
            )
        .withWatermark("RESULT_LAST_MOD", "30 seconds")
        )

then what I would expect is that I get the same number of output records (~10'000), and some
of them have the additional fields from the C stream.

But what happens is that my output is reduced to ~1'500 records, exactly those which have
a successful join on records on topic C. The other are not shown on the output.

I already tried

   * make sure that the optional FK on topic B is never null, by using an NVL2(C_FK, C_FK,
'FFFF')
   * widen the time window join on the leftOuter to "B_LAST_MOD < C_LAST_LAST_MOD - interval
5 seconds ..."
   * use various combinations of joinWindows and watermarkLateThreshold

The result is always the same: I'm "losing" the ~8'500 records for which the optional join
FK is NULL on topic B.

Did I totally misunderstand the concept of stream-stream left outer join? Or what could be
wrong

-- 
CU, Joe

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message