spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Oliveira <>
Subject Re: [Spark Structured Streaming] retry/replay failed messages
Date Fri, 09 Jul 2021 16:41:04 GMT
Hello! Sure thing!

I'm reading them *separately*, both are apps written with Scala + Spark
Structured Streaming.

I feel like I missed some details on my original thread (sorry it was past
4 AM) and it was getting frustrating
Please let me try to clarify some points:

*Transactions Created Consumer*
| Kafka trx-created-topic   |   <--- (Scala + SparkStructured Streaming)
ConsumerApp --->  Sinks to ---> Postgres DB Table (Transactions)

*Transactions Processed Consumer*
| Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
-------------------------------------           2) Selects the Ids
|   Postgres / Trx table         |. <--- 3) Fetches the rows w/ the
matching ids that have status 'created (let's call it "b")
-------------------------------------         4)  Performs an intersection
between "a" and "b" resulting in a "b_that_needs_sinking" (but now there's
some "b_leftovers" that were out of the intersection)
                                                     5)  Sinks
"b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
unprocessed (not persisted)
                                                     6) However, those
"b_leftovers" would, ultimately, be processed at some point (even if it
takes like 1-3 days) - when their corresponding transaction_id are
                                                         pushed to the
"trx-created-topic" Kafka topic, and are then processed by that first

So, what I'm trying to accomplish is find a way to reprocess those
"b_leftovers" *without *having to restart the app
Does that make sense?

PS: It doesn't necessarily have to be real streaming, if micro-batching
(legacy Spark Streaming) would allow such a thing, it would technically
work (although I keep hearing it's not advisable)

Thank you so much!

Kind regards

On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <>

> Can you please clarify if you are reading these two topics separately or
> within the same scala or python script in Spark Structured Streaming?
>    view my Linkedin profile
> <>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <> wrote:
>> Hello guys,
>> I've been struggling with this for some days now, without success, so I
>> would highly appreciate any enlightenment. The simplified scenario is the
>> following:
>>    - I've got 2 topics in Kafka (it's already like that in production,
>>    can't change it)
>>       - transactions-created,
>>       - transaction-processed
>>    - Even though the schema is not exactly the same, they all share a
>>    correlation_id, which is their "transaction_id"
>> So, long story short, I've got 2 consumers, one for each topic, and all I
>> wanna do is sink them in a chain order. I'm writing them w/ Spark
>> Structured Streaming, btw
>> So far so good, the caveat here is:
>> - I cannot write a given "*processed" *transaction unless there is an
>> entry of that same transaction with the status "*created*".
>> - There is *no* guarantee that any transactions in the topic
>> "transaction-*processed*" have a match (same transaction_id) in the
>> "transaction-*created*" at the moment the messages are fetched.
>> So the workflow so far is:
>> - Msgs from the "transaction-created" just get synced to postgres, no
>> questions asked
>> - As for the "transaction-processed", it goes as follows:
>>    - a) Messages are fetched from the Kafka topic
>>    - b) Select the transaction_id of those...
>>    - c) Fetch all the rows w/ the corresponding id from a Postgres table
>>    AND that have the status "CREATED"
>>    - d) Then, a pretty much do a intersection between the two datasets,
>>    and sink only on "processed" ones that have with step c
>>    - e) Persist the resulting dataset
>> But the rows (from the 'processed') that were not part of the
>> intersection get lost afterwards...
>> So my question is:
>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting the
>> app?
>> - For this scenario, should I fall back to Spark Streaming, instead of
>> Structured Streaming?
>> PS: I was playing around with Spark Streaming (legacy) and managed to
>> commit only the ones in the microbatches that were fully successful (still
>> failed to find a way to "poll" for the uncommitted ones without restarting,
>> though).
>> Thank you very much in advance!

View raw message