spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayan guha <guha.a...@gmail.com>
Subject Re: [Spark Structured Streaming] retry/replay failed messages
Date Sat, 10 Jul 2021 00:23:21 GMT
Hi

Option 1: You can write back to processed queue and add some additional
info like last time tried and some counter.

Option 2: Load unpaired transactiona in a staging table in postgres. Modify
streaming job of "created" flow to try to pair any unpaired trx and clear
it by moving to main table. You may need to create a batch job to delete
records from the staging table to remove unpaired records which pass a
certain age.

Ayan

On Sat, 10 Jul 2021 at 9:49 am, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> One alternative I can think of is that you publish your orphaned
> transactions to another topic from the main Spark job
>
> You create a new DF based on orphaned transactions
>
> result = orphanedDF \
>                     ......
>                     .writeStream \
>                      .outputMode('complete') \
>                      .format("kafka") \
>                      .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                      .option("topic", "orphaned") \
>                      .option('checkpointLocation', checkpoint_path) \
>                      .queryName("orphanedTransactions") \
>                      .start()
>
>
> And consume it somewhere else
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Jul 2021 at 00:36, Bruno Oliveira <bruno.ariev@gmail.com>
> wrote:
>
>> I mean... I guess?
>>
>> But I don't really have Airflow here, and I didn't really wanted to fall
>> back to a "batch"-kinda approach with Airflow
>>
>> I'd rather use a Dead Letter Queue approach instead (like I mentioned
>> another topic for the failed ones, which is later consumed and pumps
>> the messages back to the original topic),
>> or something with Spark+Delta Lake instead...
>>
>> I was just hoping I could somewhat just retry/replay these "orphaned"
>> transactions somewhat easier...
>>
>> *Question) *Those features of "Stateful Streaming" or "Continuous
>> Processing" mode wouldn't help solve my case, would they?
>>
>> On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>>> Well this is a matter of using journal entries.
>>>
>>> What you can do is that those "orphaned" transactions that you cannot
>>> pair through transaction_id can be written to a journal table in your
>>> Postgres DB. Then you can pair them with the entries in the relevant
>>> Postgres table. If the essence is not time critical this can be done
>>> through a scheduling job every x minutes through airflow or something
>>> similar on the database alone.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira <bruno.ariev@gmail.com>
>>> wrote:
>>>
>>>> That is exactly the case, Sebastian!
>>>>
>>>> - In practise, that "created  means "*authorized*", but I still cannot
>>>> deduct anything from the customer balance
>>>> - the "processed" means I can safely deduct the transaction_amount
>>>> from the customer balance,
>>>> - and the "refunded" means I must give the transaction amount back to
>>>> the customer balance
>>>>
>>>> So, technically, we cannot process something that is not "AUTHORIZED"
>>>> (created) yet, nor can we process a refund for a transaction that has NOT
>>>> been PROCESSED yet.
>>>>
>>>>
>>>> *You have an authorisation, then the actual transaction and maybe a
>>>>> refund some time in the future. You want to proceed with a transaction
only
>>>>> if you've seen the auth but in an eventually consistent system this might
>>>>> not always happen.*
>>>>
>>>>
>>>> That's absolutely the case! So, yes, That's correct.
>>>>
>>>> *You are asking in the case of receiving the transaction before the
>>>>> auth how to retry later? *
>>>>
>>>>
>>>> Yeah! I'm struggling for days on how to solve with Spark Structured
>>>> Streaming...
>>>>
>>>> *Right now you are discarding those transactions that didn't match so
>>>>> you instead would need to persist them somewhere and either reinject
them
>>>>> into the job that does lookup (say after x minutes) *
>>>>
>>>>
>>>>
>>>> *Right now, the best I could think of is: *
>>>>
>>>>    - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
>>>>    Kafka (topic "transactions-processed")
>>>>    - Then I'm querying the database for these IDs that have the status
>>>>    "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>>>>    transactions for IDs [1, 2]
>>>>    - So, while it'll work for the ones with ID [1. 2] , I would have
>>>>    to put that transaction_id 3 in another topic, say, "
>>>>    *transaction-processed-retry*"
>>>>    - And write yet another consumer, to fetch the messages from that "*transaction-processed-retry"
>>>>    *and put them back to the original topic (transactions-processed)
>>>>    - And do something similar for the transactions-refunded
>>>>
>>>> *Q1) *I think this approach may work, but I can't stop thinking I'm
>>>> overengineering this, and was wondering if there isn't a better approach...
>>>> ?
>>>>
>>>> *Is this what you are looking for?*
>>>>
>>>>
>>>> Yes, that's exactly it.
>>>>
>>>>
>>>> *Q2)* I know that, under the hood, Structured Streaming is actually
>>>> using the micro-batch engine,
>>>>          if I switched to *Continuous Processing*, would it make any
>>>> difference? Would it allow me any "retry" mechanism out of the box?
>>>>
>>>> *Q3)* I stumbled upon a *Stateful Streaming* (
>>>> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
>>>> , but I have never ever used it before,
>>>>         would that actually do something for my case
>>>> (retrying/replaying a given message) ?
>>>>
>>>>
>>>> Thank you very VERY in advance!
>>>> Best regards
>>>>
>>>>
>>>> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu <sebastian.piu@gmail.com>
>>>> wrote:
>>>>
>>>>> So in payment systems you have something similar I think
>>>>>
>>>>> You have an authorisation, then the actual transaction and maybe a
>>>>> refund some time in the future. You want to proceed with a transaction
only
>>>>> if you've seen the auth but in an eventually consistent system this might
>>>>> not always happen.
>>>>>
>>>>> You are asking in the case of receiving the transaction before the
>>>>> auth how to retry later?
>>>>>
>>>>> Right now you are discarding those transactions that didn't match so
>>>>> you instead would need to persist them somewhere and either reinject
them
>>>>> into the job that does lookup (say after x minutes)
>>>>>
>>>>> Is this what you are looking for?
>>>>>
>>>>> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, <bruno.ariev@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm terribly sorry, Mich. That was my mistake.
>>>>>> The timestamps are not the same (I copy&pasted without realizing
>>>>>> that, I'm really sorry for the confusion)
>>>>>>
>>>>>> Please assume NONE of the following transactions are in the database
>>>>>> yet
>>>>>>
>>>>>> *transactions-created:*
>>>>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>>>>> 11:01:00" }
>>>>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>>>>> 08:02:00" }
>>>>>>
>>>>>> *transactions-processed: *
>>>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" }     //
so
>>>>>> it's processed 2 minutes after it was created
>>>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }     //
so
>>>>>> it's processed 4 hours after it was created
>>>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }    //
>>>>>> cannot be persisted into the DB yet, because this "transaction_id
3" with
>>>>>> the status "CREATED" does NOT exist in the DB
>>>>>>
>>>>>>
>>>>>> *(...) Transactions-created are created at the same time (the same
>>>>>>> timestamp) but you have NOT received them and they don't yet
exist in your
>>>>>>> DB (...)*
>>>>>>
>>>>>> - Not at the same timestamp, that was my mistake.
>>>>>> - Imagine two transactions with the same ID (neither of them are
in
>>>>>> any Kafka topic yet),
>>>>>>
>>>>>>    - One with the status CREATED, and another with the status
>>>>>>    PROCESSED,
>>>>>>    - The one with the status PROCESSED will ALWAYS have a
>>>>>>    higher/greater timestamp than the one with the status CREATED
>>>>>>    - Now for whatever reason, this happens:
>>>>>>       - Step a) some producer *fails* to push the *created* one to
>>>>>>       the topic  *transactions-created, it will RETRY, and will
>>>>>>       eventually succeed, but that can take minutes, or hours*
>>>>>>       - Step b) however, the producer *succeeds* in pushing the*
>>>>>>       'processed' *one to the topic *transactions-processed *
>>>>>>
>>>>>>
>>>>>> *(...) because presumably your relational database is too slow to
>>>>>>> ingest them? (...)*
>>>>>>
>>>>>>
>>>>>> - it's not like the DB was slow, it was because the message for
>>>>>> transaction_id 3 didn't arrive at the *topic-created *yet, due to
>>>>>> some error/failure in Step A, for example
>>>>>>
>>>>>>
>>>>>> * you do a query in Postgres for say transaction_id 3 but they don't
>>>>>>> exist yet? When are they expected to arrive?*
>>>>>>
>>>>>>
>>>>>> - That's correct. It could take minutes, maybe hours. But it is
>>>>>> guaranteed that at some point, in the future, they will arrive. I
just have
>>>>>> to keep trying until it works, this transaction_id 3 with the status
>>>>>> CREATED arrives at the database
>>>>>>
>>>>>>
>>>>>> Huge apologies for the confusion... Is it a bit more clear now?
>>>>>>
>>>>>> *PS:* This is a simplified scenario, in practise, there is yet
>>>>>> another topic for "transactions-refunded". But which cannot be sinked
to
>>>>>> the DB, unless the same transaction_id with the status "PROCESSED"
is
>>>>>> there. (but again, there can only be a transaction_id PROCESSED,
if the
>>>>>> same transaction_id with CREATED exists in the DB)
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> One second
>>>>>>>
>>>>>>> The topic called transactions_processed is streaming through
Spark.
>>>>>>> Transactions-created are created at the same time (the same timestamp)
but
>>>>>>> you have NOT received them and they don't yet exist in your DB,
>>>>>>> because presumably your relational database is too slow to ingest
them? you
>>>>>>> do a query in Postgres for say transaction_id 3 but they don't
exist yet?
>>>>>>> When are they expected to arrive?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ariev@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the quick reply!
>>>>>>>>
>>>>>>>> I'm not sure I got the idea correctly... but from what I'm
>>>>>>>> underding, wouldn't that actually end the same way?
>>>>>>>> Because, this is the current scenario:
>>>>>>>>
>>>>>>>> *transactions-processed: *
>>>>>>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00"
}
>>>>>>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00"
}
>>>>>>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00"
}
>>>>>>>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00"
}
>>>>>>>>
>>>>>>>> *transactions-created:*
>>>>>>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>>>>>>> 11:01:00" }
>>>>>>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>>>>>>> 12:02:00" }
>>>>>>>>
>>>>>>>> - So, when I fetch ALL messages from both topics, there are
still
>>>>>>>> 2x transactions (id: "*3*" and "*4*") which do *not* exist
in the
>>>>>>>> topic "transaction-created" yet (and they aren't in Postgres
either)
>>>>>>>> - But since they were pulled by "Structured Streaming" already,
>>>>>>>> they'll be kinda marked as "processed" by Spark Structure
Streaming
>>>>>>>> checkpoint anyway.
>>>>>>>>
>>>>>>>> And therefore, I can't replay/reprocess them again...
>>>>>>>>
>>>>>>>> Is my understanding correct? Am I missing something here?
>>>>>>>>
>>>>>>>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the details.
>>>>>>>>>
>>>>>>>>> Can you read these in the same app. For example. This
is PySpark
>>>>>>>>> but it serves the purpose.
>>>>>>>>>
>>>>>>>>> Read topic "newtopic" in micro batch and the other topic
"md" in
>>>>>>>>> another microbatch
>>>>>>>>>
>>>>>>>>>         try:
>>>>>>>>>             # process topic --> newtopic
>>>>>>>>>             streamingNewtopic = self.spark \
>>>>>>>>>                 .readStream \
>>>>>>>>>                 .format("kafka") \
>>>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>>>                 .option("schema.registry.url",
>>>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>>>                 .option("group.id", config['common']['newtopic'])
>>>>>>>>> \
>>>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs'])
\
>>>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>>>                 *.option("subscribe",
>>>>>>>>> config['MDVariables']['newtopic']) \*
>>>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>>>                 .option("includeHeaders", "true") \
>>>>>>>>>                 .option("startingOffsets", "latest")
\
>>>>>>>>>                 .load() \
>>>>>>>>>                 .select(from_json(col("value").cast("string"),
>>>>>>>>> newtopicSchema).alias("newtopic_value"))
>>>>>>>>>
>>>>>>>>>             # construct a streaming dataframe streamingDataFrame
>>>>>>>>> that subscribes to topic config['MDVariables']['topic'])
-> md (market data)
>>>>>>>>>             streamingDataFrame = self.spark \
>>>>>>>>>                 .readStream \
>>>>>>>>>                 .format("kafka") \
>>>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>>>                 .option("schema.registry.url",
>>>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>>>                 .option("group.id", config['common']['appName'])
\
>>>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs'])
\
>>>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>>>                 *.option("subscribe",
>>>>>>>>> config['MDVariables']['topic']) \*
>>>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>>>                 .option("includeHeaders", "true") \
>>>>>>>>>                 .option("startingOffsets", "latest")
\
>>>>>>>>>                 .load() \
>>>>>>>>>                 .select(from_json(col("value").cast("string"),
>>>>>>>>> schema).alias("parsed_value"))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>             streamingNewtopic.printSchema()
>>>>>>>>>
>>>>>>>>>             # Now do a writeStream and call the relevant
>>>>>>>>> functions to process dataframes
>>>>>>>>>
>>>>>>>>>             newtopicResult = streamingNewtopic.select(
\
>>>>>>>>>                      col("newtopic_value.uuid").alias("uuid")
\
>>>>>>>>>                    ,
>>>>>>>>> col("newtopic_value.timeissued").alias("timeissued")
\
>>>>>>>>>                    , col("newtopic_value.queue").alias("queue")
\
>>>>>>>>>                    ,
>>>>>>>>> col("newtopic_value.status").alias("status")). \
>>>>>>>>>                      writeStream. \
>>>>>>>>>                      outputMode('append'). \
>>>>>>>>>                      option("truncate", "false"). \
>>>>>>>>>   *                   foreachBatch(sendToControl). \*
>>>>>>>>>                      trigger(processingTime='2 seconds').
\
>>>>>>>>>                      queryName(config['MDVariables']['newtopic']).
>>>>>>>>> \
>>>>>>>>>                      start()
>>>>>>>>>
>>>>>>>>>             result = streamingDataFrame.select( \
>>>>>>>>>                      col("parsed_value.rowkey").alias("rowkey")
\
>>>>>>>>>                    , col("parsed_value.ticker").alias("ticker")
\
>>>>>>>>>                    ,
>>>>>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>>>>>                    , col("parsed_value.price").alias("price")).
\
>>>>>>>>>                      writeStream. \
>>>>>>>>>                      outputMode('append'). \
>>>>>>>>>                      option("truncate", "false"). \
>>>>>>>>>                      *foreachBatch(sendToSink). \*
>>>>>>>>>                      trigger(processingTime='30 seconds').
\
>>>>>>>>>                      option('checkpointLocation',
>>>>>>>>> checkpoint_path). \
>>>>>>>>>                      queryName(config['MDVariables']['topic']).
\
>>>>>>>>>                      start()
>>>>>>>>>             print(result)
>>>>>>>>>
>>>>>>>>>         except Exception as e:
>>>>>>>>>                 print(f"""{e}, quitting""")
>>>>>>>>>                 sys.exit(1)
>>>>>>>>>
>>>>>>>>> Inside that function say *sendToSink *you can get the
df and
>>>>>>>>> batchId
>>>>>>>>>
>>>>>>>>> def sendToSink(df, batchId):
>>>>>>>>>     if(len(df.take(1))) > 0:
>>>>>>>>>         print(f"""md batchId is {batchId}""")
>>>>>>>>>         df.show(100,False)
>>>>>>>>>         df. persist()
>>>>>>>>>         # write to BigQuery batch table
>>>>>>>>>         s.writeTableToBQ(df, "append",
>>>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>>>>>>>         df.unpersist()
>>>>>>>>>         print(f"""wrote to DB""")
>>>>>>>>>     else:
>>>>>>>>>         print("DataFrame md is empty")
>>>>>>>>>
>>>>>>>>> And you have created DF from the other topic newtopic
>>>>>>>>>
>>>>>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>>>>>     if(len(dfnewtopic.take(1))) > 0:
>>>>>>>>>         ......
>>>>>>>>>
>>>>>>>>> Now you have  two dataframe* df* and *dfnewtopic* in
the same
>>>>>>>>> session. Will you be able to join these two dataframes
through common key
>>>>>>>>> value?
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>>> arise from relying on this email's technical content
is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for
any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ariev@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello! Sure thing!
>>>>>>>>>>
>>>>>>>>>> I'm reading them *separately*, both are apps written
with Scala
>>>>>>>>>> + Spark Structured Streaming.
>>>>>>>>>>
>>>>>>>>>> I feel like I missed some details on my original
thread (sorry it
>>>>>>>>>> was past 4 AM) and it was getting frustrating
>>>>>>>>>> Please let me try to clarify some points:
>>>>>>>>>>
>>>>>>>>>> *Transactions Created Consumer*
>>>>>>>>>> -----------------------------------
>>>>>>>>>> | Kafka trx-created-topic   |   <--- (Scala +
SparkStructured
>>>>>>>>>> Streaming) ConsumerApp --->  Sinks to --->
Postgres DB Table
>>>>>>>>>> (Transactions)
>>>>>>>>>> -----------------------------------
>>>>>>>>>>
>>>>>>>>>> *Transactions Processed Consumer*
>>>>>>>>>> -------------------------------------
>>>>>>>>>> | Kafka trx-processed-topic |  <---   1) (Scala
+ SparkStructured
>>>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's
call it "a")
>>>>>>>>>> -------------------------------------           2)
Selects the
>>>>>>>>>> Ids
>>>>>>>>>> -------------------------------------
>>>>>>>>>> |   Postgres / Trx table         |. <--- 3) Fetches
the rows w/
>>>>>>>>>> the matching ids that have status 'created (let's
call it "b")
>>>>>>>>>> -------------------------------------         4)
 Performs an
>>>>>>>>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking"
(but
>>>>>>>>>> now there's some "b_leftovers" that were out of the
intersection)
>>>>>>>>>>                                                 
    5)  Sinks
>>>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the
"b_leftovers" as
>>>>>>>>>> unprocessed (not persisted)
>>>>>>>>>>                                                 
    6) However,
>>>>>>>>>> those "b_leftovers" would, ultimately, be processed
at some point (even if
>>>>>>>>>> it takes like 1-3 days) - when their corresponding
transaction_id are
>>>>>>>>>>                                                 
        pushed
>>>>>>>>>> to the "trx-created-topic" Kafka topic, and are then
processed by that
>>>>>>>>>> first consumer
>>>>>>>>>>
>>>>>>>>>> So, what I'm trying to accomplish is find a way to
reprocess
>>>>>>>>>> those "b_leftovers" *without *having to restart the
app
>>>>>>>>>> Does that make sense?
>>>>>>>>>>
>>>>>>>>>> PS: It doesn't necessarily have to be real streaming,
if
>>>>>>>>>> micro-batching (legacy Spark Streaming) would allow
such a thing, it would
>>>>>>>>>> technically work (although I keep hearing it's not
advisable)
>>>>>>>>>>
>>>>>>>>>> Thank you so much!
>>>>>>>>>>
>>>>>>>>>> Kind regards
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you please clarify if you are reading these
two topics
>>>>>>>>>>> separately or within the same scala or python
script in Spark Structured
>>>>>>>>>>> Streaming?
>>>>>>>>>>>
>>>>>>>>>>> HTH
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and
all
>>>>>>>>>>> responsibility for any loss, damage or destruction
of data or any other
>>>>>>>>>>> property which may arise from relying on this
email's technical content is
>>>>>>>>>>> explicitly disclaimed. The author will in no
case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage
or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <
>>>>>>>>>>> bruno.ariev@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello guys,
>>>>>>>>>>>>
>>>>>>>>>>>> I've been struggling with this for some days
now, without
>>>>>>>>>>>> success, so I would highly appreciate any
enlightenment. The simplified
>>>>>>>>>>>> scenario is the following:
>>>>>>>>>>>>
>>>>>>>>>>>>    - I've got 2 topics in Kafka (it's already
like that in
>>>>>>>>>>>>    production, can't change it)
>>>>>>>>>>>>       - transactions-created,
>>>>>>>>>>>>       - transaction-processed
>>>>>>>>>>>>    - Even though the schema is not exactly
the same, they all
>>>>>>>>>>>>    share a correlation_id, which is their
"transaction_id"
>>>>>>>>>>>>
>>>>>>>>>>>> So, long story short, I've got 2 consumers,
one for each topic,
>>>>>>>>>>>> and all I wanna do is sink them in a chain
order. I'm writing them w/ Spark
>>>>>>>>>>>> Structured Streaming, btw
>>>>>>>>>>>>
>>>>>>>>>>>> So far so good, the caveat here is:
>>>>>>>>>>>>
>>>>>>>>>>>> - I cannot write a given "*processed" *transaction
unless
>>>>>>>>>>>> there is an entry of that same transaction
with the status "
>>>>>>>>>>>> *created*".
>>>>>>>>>>>>
>>>>>>>>>>>> - There is *no* guarantee that any transactions
in the topic
>>>>>>>>>>>> "transaction-*processed*" have a match (same
transaction_id)
>>>>>>>>>>>> in the "transaction-*created*" at the moment
the messages are
>>>>>>>>>>>> fetched.
>>>>>>>>>>>>
>>>>>>>>>>>> So the workflow so far is:
>>>>>>>>>>>> - Msgs from the "transaction-created" just
get synced to
>>>>>>>>>>>> postgres, no questions asked
>>>>>>>>>>>>
>>>>>>>>>>>> - As for the "transaction-processed", it
goes as follows:
>>>>>>>>>>>>
>>>>>>>>>>>>    - a) Messages are fetched from the Kafka
topic
>>>>>>>>>>>>    - b) Select the transaction_id of those...
>>>>>>>>>>>>    - c) Fetch all the rows w/ the corresponding
id from a
>>>>>>>>>>>>    Postgres table AND that have the status
"CREATED"
>>>>>>>>>>>>    - d) Then, a pretty much do a intersection
between the two
>>>>>>>>>>>>    datasets, and sink only on "processed"
ones that have with step c
>>>>>>>>>>>>    - e) Persist the resulting dataset
>>>>>>>>>>>>
>>>>>>>>>>>> But the rows (from the 'processed') that
were not part of the
>>>>>>>>>>>> intersection get lost afterwards...
>>>>>>>>>>>>
>>>>>>>>>>>> So my question is:
>>>>>>>>>>>> - Is there ANY way to reprocess/replay them
at all WITHOUT
>>>>>>>>>>>> restarting the app?
>>>>>>>>>>>> - For this scenario, should I fall back to
Spark Streaming,
>>>>>>>>>>>> instead of Structured Streaming?
>>>>>>>>>>>>
>>>>>>>>>>>> PS: I was playing around with Spark Streaming
(legacy) and
>>>>>>>>>>>> managed to commit only the ones in the microbatches
that were fully
>>>>>>>>>>>> successful (still failed to find a way to
"poll" for the uncommitted ones
>>>>>>>>>>>> without restarting, though).
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you very much in advance!
>>>>>>>>>>>>
>>>>>>>>>>>> --
Best Regards,
Ayan Guha

Mime
View raw message