spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Oliveira <>
Subject [Spark Structured Streaming] retry/replay failed messages
Date Fri, 09 Jul 2021 07:40:49 GMT
Hello guys,

I've been struggling with this for some days now, without success, so I
would highly appreciate any enlightenment. The simplified scenario is the

   - I've got 2 topics in Kafka (it's already like that in production,
   can't change it)
      - transactions-created,
      - transaction-processed
   - Even though the schema is not exactly the same, they all share a
   correlation_id, which is their "transaction_id"

So, long story short, I've got 2 consumers, one for each topic, and all I
wanna do is sink them in a chain order. I'm writing them w/ Spark
Structured Streaming, btw

So far so good, the caveat here is:

- I cannot write a given "*processed" *transaction unless there is an entry
of that same transaction with the status "*created*".

- There is *no* guarantee that any transactions in the topic "transaction-
*processed*" have a match (same transaction_id) in the "transaction-
*created*" at the moment the messages are fetched.

So the workflow so far is:
- Msgs from the "transaction-created" just get synced to postgres, no
questions asked

- As for the "transaction-processed", it goes as follows:

   - a) Messages are fetched from the Kafka topic
   - b) Select the transaction_id of those...
   - c) Fetch all the rows w/ the corresponding id from a Postgres table
   AND that have the status "CREATED"
   - d) Then, a pretty much do a intersection between the two datasets, and
   sink only on "processed" ones that have with step c
   - e) Persist the resulting dataset

But the rows (from the 'processed') that were not part of the intersection
get lost afterwards...

So my question is:
- Is there ANY way to reprocess/replay them at all WITHOUT restarting the
- For this scenario, should I fall back to Spark Streaming, instead of
Structured Streaming?

PS: I was playing around with Spark Streaming (legacy) and managed to
commit only the ones in the microbatches that were fully successful (still
failed to find a way to "poll" for the uncommitted ones without restarting,

Thank you very much in advance!

View raw message