spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Oliveira <bruno.ar...@gmail.com>
Subject Re: [Spark Structured Streaming] retry/replay failed messages
Date Fri, 09 Jul 2021 23:36:10 GMT
I mean... I guess?

But I don't really have Airflow here, and I didn't really wanted to fall
back to a "batch"-kinda approach with Airflow

I'd rather use a Dead Letter Queue approach instead (like I mentioned
another topic for the failed ones, which is later consumed and pumps
the messages back to the original topic),
or something with Spark+Delta Lake instead...

I was just hoping I could somewhat just retry/replay these "orphaned"
transactions somewhat easier...

*Question) *Those features of "Stateful Streaming" or "Continuous
Processing" mode wouldn't help solve my case, would they?

On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Well this is a matter of using journal entries.
>
> What you can do is that those "orphaned" transactions that you cannot pair
> through transaction_id can be written to a journal table in your Postgres
> DB. Then you can pair them with the entries in the relevant Postgres table.
> If the essence is not time critical this can be done through a scheduling
> job every x minutes through airflow or something similar on the database
> alone.
>
> HTH
>
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira <bruno.ariev@gmail.com> wrote:
>
>> That is exactly the case, Sebastian!
>>
>> - In practise, that "created  means "*authorized*", but I still cannot
>> deduct anything from the customer balance
>> - the "processed" means I can safely deduct the transaction_amount  from
>> the customer balance,
>> - and the "refunded" means I must give the transaction amount back to the
>> customer balance
>>
>> So, technically, we cannot process something that is not "AUTHORIZED"
>> (created) yet, nor can we process a refund for a transaction that has NOT
>> been PROCESSED yet.
>>
>>
>> *You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>>> not always happen.*
>>
>>
>> That's absolutely the case! So, yes, That's correct.
>>
>> *You are asking in the case of receiving the transaction before the auth
>>> how to retry later? *
>>
>>
>> Yeah! I'm struggling for days on how to solve with Spark Structured
>> Streaming...
>>
>> *Right now you are discarding those transactions that didn't match so you
>>> instead would need to persist them somewhere and either reinject them into
>>> the job that does lookup (say after x minutes) *
>>
>>
>>
>> *Right now, the best I could think of is: *
>>
>>    - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
>>    Kafka (topic "transactions-processed")
>>    - Then I'm querying the database for these IDs that have the status
>>    "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>>    transactions for IDs [1, 2]
>>    - So, while it'll work for the ones with ID [1. 2] , I would have to
>>    put that transaction_id 3 in another topic, say, "
>>    *transaction-processed-retry*"
>>    - And write yet another consumer, to fetch the messages from that "*transaction-processed-retry"
>>    *and put them back to the original topic (transactions-processed)
>>    - And do something similar for the transactions-refunded
>>
>> *Q1) *I think this approach may work, but I can't stop thinking I'm
>> overengineering this, and was wondering if there isn't a better approach...
>> ?
>>
>> *Is this what you are looking for?*
>>
>>
>> Yes, that's exactly it.
>>
>>
>> *Q2)* I know that, under the hood, Structured Streaming is actually
>> using the micro-batch engine,
>>          if I switched to *Continuous Processing*, would it make any
>> difference? Would it allow me any "retry" mechanism out of the box?
>>
>> *Q3)* I stumbled upon a *Stateful Streaming* (
>> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
>> , but I have never ever used it before,
>>         would that actually do something for my case (retrying/replaying
>> a given message) ?
>>
>>
>> Thank you very VERY in advance!
>> Best regards
>>
>>
>> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu <sebastian.piu@gmail.com>
>> wrote:
>>
>>> So in payment systems you have something similar I think
>>>
>>> You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>>> not always happen.
>>>
>>> You are asking in the case of receiving the transaction before the auth
>>> how to retry later?
>>>
>>> Right now you are discarding those transactions that didn't match so you
>>> instead would need to persist them somewhere and either reinject them into
>>> the job that does lookup (say after x minutes)
>>>
>>> Is this what you are looking for?
>>>
>>> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, <bruno.ariev@gmail.com>
>>> wrote:
>>>
>>>> I'm terribly sorry, Mich. That was my mistake.
>>>> The timestamps are not the same (I copy&pasted without realizing that,
>>>> I'm really sorry for the confusion)
>>>>
>>>> Please assume NONE of the following transactions are in the database yet
>>>>
>>>> *transactions-created:*
>>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>>> 11:01:00" }
>>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>>> 08:02:00" }
>>>>
>>>> *transactions-processed: *
>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" }     // so
>>>> it's processed 2 minutes after it was created
>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }     // so
>>>> it's processed 4 hours after it was created
>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }    //
>>>> cannot be persisted into the DB yet, because this "transaction_id 3" with
>>>> the status "CREATED" does NOT exist in the DB
>>>>
>>>>
>>>> *(...) Transactions-created are created at the same time (the same
>>>>> timestamp) but you have NOT received them and they don't yet exist in
your
>>>>> DB (...)*
>>>>
>>>> - Not at the same timestamp, that was my mistake.
>>>> - Imagine two transactions with the same ID (neither of them are in any
>>>> Kafka topic yet),
>>>>
>>>>    - One with the status CREATED, and another with the status
>>>>    PROCESSED,
>>>>    - The one with the status PROCESSED will ALWAYS have a
>>>>    higher/greater timestamp than the one with the status CREATED
>>>>    - Now for whatever reason, this happens:
>>>>       - Step a) some producer *fails* to push the *created* one to the
>>>>       topic  *transactions-created, it will RETRY, and will eventually
>>>>       succeed, but that can take minutes, or hours*
>>>>       - Step b) however, the producer *succeeds* in pushing the*
>>>>       'processed' *one to the topic *transactions-processed *
>>>>
>>>>
>>>> *(...) because presumably your relational database is too slow to
>>>>> ingest them? (...)*
>>>>
>>>>
>>>> - it's not like the DB was slow, it was because the message for
>>>> transaction_id 3 didn't arrive at the *topic-created *yet, due to some
>>>> error/failure in Step A, for example
>>>>
>>>>
>>>> * you do a query in Postgres for say transaction_id 3 but they don't
>>>>> exist yet? When are they expected to arrive?*
>>>>
>>>>
>>>> - That's correct. It could take minutes, maybe hours. But it is
>>>> guaranteed that at some point, in the future, they will arrive. I just have
>>>> to keep trying until it works, this transaction_id 3 with the status
>>>> CREATED arrives at the database
>>>>
>>>>
>>>> Huge apologies for the confusion... Is it a bit more clear now?
>>>>
>>>> *PS:* This is a simplified scenario, in practise, there is yet another
>>>> topic for "transactions-refunded". But which cannot be sinked to the DB,
>>>> unless the same transaction_id with the status "PROCESSED" is there. (but
>>>> again, there can only be a transaction_id PROCESSED, if the same
>>>> transaction_id with CREATED exists in the DB)
>>>>
>>>>
>>>> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> One second
>>>>>
>>>>> The topic called transactions_processed is streaming through Spark.
>>>>> Transactions-created are created at the same time (the same timestamp)
but
>>>>> you have NOT received them and they don't yet exist in your DB,
>>>>> because presumably your relational database is too slow to ingest them?
you
>>>>> do a query in Postgres for say transaction_id 3 but they don't exist
yet?
>>>>> When are they expected to arrive?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ariev@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the quick reply!
>>>>>>
>>>>>> I'm not sure I got the idea correctly... but from what I'm underding,
>>>>>> wouldn't that actually end the same way?
>>>>>> Because, this is the current scenario:
>>>>>>
>>>>>> *transactions-processed: *
>>>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>>>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>>>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>>>>>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>>>>>>
>>>>>> *transactions-created:*
>>>>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>>>>> 11:01:00" }
>>>>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>>>>> 12:02:00" }
>>>>>>
>>>>>> - So, when I fetch ALL messages from both topics, there are still
2x
>>>>>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
>>>>>> "transaction-created" yet (and they aren't in Postgres either)
>>>>>> - But since they were pulled by "Structured Streaming" already,
>>>>>> they'll be kinda marked as "processed" by Spark Structure Streaming
>>>>>> checkpoint anyway.
>>>>>>
>>>>>> And therefore, I can't replay/reprocess them again...
>>>>>>
>>>>>> Is my understanding correct? Am I missing something here?
>>>>>>
>>>>>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the details.
>>>>>>>
>>>>>>> Can you read these in the same app. For example. This is PySpark
but
>>>>>>> it serves the purpose.
>>>>>>>
>>>>>>> Read topic "newtopic" in micro batch and the other topic "md"
in
>>>>>>> another microbatch
>>>>>>>
>>>>>>>         try:
>>>>>>>             # process topic --> newtopic
>>>>>>>             streamingNewtopic = self.spark \
>>>>>>>                 .readStream \
>>>>>>>                 .format("kafka") \
>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>                 .option("schema.registry.url",
>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>                 .option("group.id", config['common']['newtopic'])
\
>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>                 *.option("subscribe",
>>>>>>> config['MDVariables']['newtopic']) \*
>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>                 .option("includeHeaders", "true") \
>>>>>>>                 .option("startingOffsets", "latest") \
>>>>>>>                 .load() \
>>>>>>>                 .select(from_json(col("value").cast("string"),
>>>>>>> newtopicSchema).alias("newtopic_value"))
>>>>>>>
>>>>>>>             # construct a streaming dataframe streamingDataFrame
>>>>>>> that subscribes to topic config['MDVariables']['topic']) ->
md (market data)
>>>>>>>             streamingDataFrame = self.spark \
>>>>>>>                 .readStream \
>>>>>>>                 .format("kafka") \
>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>                 .option("schema.registry.url",
>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>                 .option("group.id", config['common']['appName'])
\
>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>                 *.option("subscribe",
>>>>>>> config['MDVariables']['topic']) \*
>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>                 .option("includeHeaders", "true") \
>>>>>>>                 .option("startingOffsets", "latest") \
>>>>>>>                 .load() \
>>>>>>>                 .select(from_json(col("value").cast("string"),
>>>>>>> schema).alias("parsed_value"))
>>>>>>>
>>>>>>>
>>>>>>>             streamingNewtopic.printSchema()
>>>>>>>
>>>>>>>             # Now do a writeStream and call the relevant functions
>>>>>>> to process dataframes
>>>>>>>
>>>>>>>             newtopicResult = streamingNewtopic.select( \
>>>>>>>                      col("newtopic_value.uuid").alias("uuid")
\
>>>>>>>                    ,
>>>>>>> col("newtopic_value.timeissued").alias("timeissued") \
>>>>>>>                    , col("newtopic_value.queue").alias("queue")
\
>>>>>>>                    , col("newtopic_value.status").alias("status")).
\
>>>>>>>                      writeStream. \
>>>>>>>                      outputMode('append'). \
>>>>>>>                      option("truncate", "false"). \
>>>>>>>   *                   foreachBatch(sendToControl). \*
>>>>>>>                      trigger(processingTime='2 seconds'). \
>>>>>>>                      queryName(config['MDVariables']['newtopic']).
\
>>>>>>>                      start()
>>>>>>>
>>>>>>>             result = streamingDataFrame.select( \
>>>>>>>                      col("parsed_value.rowkey").alias("rowkey")
\
>>>>>>>                    , col("parsed_value.ticker").alias("ticker")
\
>>>>>>>                    ,
>>>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>>>                    , col("parsed_value.price").alias("price")).
\
>>>>>>>                      writeStream. \
>>>>>>>                      outputMode('append'). \
>>>>>>>                      option("truncate", "false"). \
>>>>>>>                      *foreachBatch(sendToSink). \*
>>>>>>>                      trigger(processingTime='30 seconds'). \
>>>>>>>                      option('checkpointLocation', checkpoint_path).
\
>>>>>>>                      queryName(config['MDVariables']['topic']).
\
>>>>>>>                      start()
>>>>>>>             print(result)
>>>>>>>
>>>>>>>         except Exception as e:
>>>>>>>                 print(f"""{e}, quitting""")
>>>>>>>                 sys.exit(1)
>>>>>>>
>>>>>>> Inside that function say *sendToSink *you can get the df and
batchId
>>>>>>>
>>>>>>> def sendToSink(df, batchId):
>>>>>>>     if(len(df.take(1))) > 0:
>>>>>>>         print(f"""md batchId is {batchId}""")
>>>>>>>         df.show(100,False)
>>>>>>>         df. persist()
>>>>>>>         # write to BigQuery batch table
>>>>>>>         s.writeTableToBQ(df, "append",
>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>>>>>         df.unpersist()
>>>>>>>         print(f"""wrote to DB""")
>>>>>>>     else:
>>>>>>>         print("DataFrame md is empty")
>>>>>>>
>>>>>>> And you have created DF from the other topic newtopic
>>>>>>>
>>>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>>>     if(len(dfnewtopic.take(1))) > 0:
>>>>>>>         ......
>>>>>>>
>>>>>>> Now you have  two dataframe* df* and *dfnewtopic* in the same
>>>>>>> session. Will you be able to join these two dataframes through
common key
>>>>>>> value?
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ariev@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello! Sure thing!
>>>>>>>>
>>>>>>>> I'm reading them *separately*, both are apps written with
Scala +
>>>>>>>> Spark Structured Streaming.
>>>>>>>>
>>>>>>>> I feel like I missed some details on my original thread (sorry
it
>>>>>>>> was past 4 AM) and it was getting frustrating
>>>>>>>> Please let me try to clarify some points:
>>>>>>>>
>>>>>>>> *Transactions Created Consumer*
>>>>>>>> -----------------------------------
>>>>>>>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured
>>>>>>>> Streaming) ConsumerApp --->  Sinks to ---> Postgres
DB Table
>>>>>>>> (Transactions)
>>>>>>>> -----------------------------------
>>>>>>>>
>>>>>>>> *Transactions Processed Consumer*
>>>>>>>> -------------------------------------
>>>>>>>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call
it "a")
>>>>>>>> -------------------------------------           2) Selects
the Ids
>>>>>>>> -------------------------------------
>>>>>>>> |   Postgres / Trx table         |. <--- 3) Fetches the
rows w/ the
>>>>>>>> matching ids that have status 'created (let's call it "b")
>>>>>>>> -------------------------------------         4)  Performs
an
>>>>>>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking"
(but
>>>>>>>> now there's some "b_leftovers" that were out of the intersection)
>>>>>>>>                                                      5) 
Sinks
>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers"
as
>>>>>>>> unprocessed (not persisted)
>>>>>>>>                                                      6) However,
>>>>>>>> those "b_leftovers" would, ultimately, be processed at some
point (even if
>>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id
are
>>>>>>>>                                                         
pushed to
>>>>>>>> the "trx-created-topic" Kafka topic, and are then processed
by that first
>>>>>>>> consumer
>>>>>>>>
>>>>>>>> So, what I'm trying to accomplish is find a way to reprocess
those
>>>>>>>> "b_leftovers" *without *having to restart the app
>>>>>>>> Does that make sense?
>>>>>>>>
>>>>>>>> PS: It doesn't necessarily have to be real streaming, if
>>>>>>>> micro-batching (legacy Spark Streaming) would allow such
a thing, it would
>>>>>>>> technically work (although I keep hearing it's not advisable)
>>>>>>>>
>>>>>>>> Thank you so much!
>>>>>>>>
>>>>>>>> Kind regards
>>>>>>>>
>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Can you please clarify if you are reading these two topics
>>>>>>>>> separately or within the same scala or python script
in Spark Structured
>>>>>>>>> Streaming?
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>>> arise from relying on this email's technical content
is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for
any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ariev@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello guys,
>>>>>>>>>>
>>>>>>>>>> I've been struggling with this for some days now,
without
>>>>>>>>>> success, so I would highly appreciate any enlightenment.
The simplified
>>>>>>>>>> scenario is the following:
>>>>>>>>>>
>>>>>>>>>>    - I've got 2 topics in Kafka (it's already like
that in
>>>>>>>>>>    production, can't change it)
>>>>>>>>>>       - transactions-created,
>>>>>>>>>>       - transaction-processed
>>>>>>>>>>    - Even though the schema is not exactly the same,
they all
>>>>>>>>>>    share a correlation_id, which is their "transaction_id"
>>>>>>>>>>
>>>>>>>>>> So, long story short, I've got 2 consumers, one for
each topic,
>>>>>>>>>> and all I wanna do is sink them in a chain order.
I'm writing them w/ Spark
>>>>>>>>>> Structured Streaming, btw
>>>>>>>>>>
>>>>>>>>>> So far so good, the caveat here is:
>>>>>>>>>>
>>>>>>>>>> - I cannot write a given "*processed" *transaction
unless there
>>>>>>>>>> is an entry of that same transaction with the status
"*created*".
>>>>>>>>>>
>>>>>>>>>> - There is *no* guarantee that any transactions in
the topic
>>>>>>>>>> "transaction-*processed*" have a match (same transaction_id)
in
>>>>>>>>>> the "transaction-*created*" at the moment the messages
are
>>>>>>>>>> fetched.
>>>>>>>>>>
>>>>>>>>>> So the workflow so far is:
>>>>>>>>>> - Msgs from the "transaction-created" just get synced
to
>>>>>>>>>> postgres, no questions asked
>>>>>>>>>>
>>>>>>>>>> - As for the "transaction-processed", it goes as
follows:
>>>>>>>>>>
>>>>>>>>>>    - a) Messages are fetched from the Kafka topic
>>>>>>>>>>    - b) Select the transaction_id of those...
>>>>>>>>>>    - c) Fetch all the rows w/ the corresponding id
from a
>>>>>>>>>>    Postgres table AND that have the status "CREATED"
>>>>>>>>>>    - d) Then, a pretty much do a intersection between
the two
>>>>>>>>>>    datasets, and sink only on "processed" ones that
have with step c
>>>>>>>>>>    - e) Persist the resulting dataset
>>>>>>>>>>
>>>>>>>>>> But the rows (from the 'processed') that were not
part of the
>>>>>>>>>> intersection get lost afterwards...
>>>>>>>>>>
>>>>>>>>>> So my question is:
>>>>>>>>>> - Is there ANY way to reprocess/replay them at all
WITHOUT
>>>>>>>>>> restarting the app?
>>>>>>>>>> - For this scenario, should I fall back to Spark
Streaming,
>>>>>>>>>> instead of Structured Streaming?
>>>>>>>>>>
>>>>>>>>>> PS: I was playing around with Spark Streaming (legacy)
and
>>>>>>>>>> managed to commit only the ones in the microbatches
that were fully
>>>>>>>>>> successful (still failed to find a way to "poll"
for the uncommitted ones
>>>>>>>>>> without restarting, though).
>>>>>>>>>>
>>>>>>>>>> Thank you very much in advance!
>>>>>>>>>>
>>>>>>>>>>

Mime
View raw message