spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Oliveira <bruno.ar...@gmail.com>
Subject Re: [Spark Structured Streaming] retry/replay failed messages
Date Fri, 09 Jul 2021 20:43:05 GMT
I'm terribly sorry, Mich. That was my mistake.
The timestamps are not the same (I copy&pasted without realizing that, I'm
really sorry for the confusion)

Please assume NONE of the following transactions are in the database yet

*transactions-created:*
{ "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
}
{ "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
08:02:00" }

*transactions-processed: *
{ "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" }     // so it's
processed 2 minutes after it was created
{ "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }     // so it's
processed 4 hours after it was created
{ "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }    // cannot be
persisted into the DB yet, because this "transaction_id 3" with the status
"CREATED" does NOT exist in the DB


*(...) Transactions-created are created at the same time (the same
> timestamp) but you have NOT received them and they don't yet exist in your
> DB (...)*

- Not at the same timestamp, that was my mistake.
- Imagine two transactions with the same ID (neither of them are in any
Kafka topic yet),

   - One with the status CREATED, and another with the status PROCESSED,
   - The one with the status PROCESSED will ALWAYS have a higher/greater
   timestamp than the one with the status CREATED
   - Now for whatever reason, this happens:
      - Step a) some producer *fails* to push the *created* one to the
      topic  *transactions-created, it will RETRY, and will eventually
      succeed, but that can take minutes, or hours*
      - Step b) however, the producer *succeeds* in pushing the*
      'processed' *one to the topic *transactions-processed *


*(...) because presumably your relational database is too slow to ingest
> them? (...)*


- it's not like the DB was slow, it was because the message for
transaction_id 3 didn't arrive at the *topic-created *yet, due to some
error/failure in Step A, for example


* you do a query in Postgres for say transaction_id 3 but they don't exist
> yet? When are they expected to arrive?*


- That's correct. It could take minutes, maybe hours. But it is guaranteed
that at some point, in the future, they will arrive. I just have to keep
trying until it works, this transaction_id 3 with the status CREATED
arrives at the database


Huge apologies for the confusion... Is it a bit more clear now?

*PS:* This is a simplified scenario, in practise, there is yet another
topic for "transactions-refunded". But which cannot be sinked to the DB,
unless the same transaction_id with the status "PROCESSED" is there. (but
again, there can only be a transaction_id PROCESSED, if the same
transaction_id with CREATED exists in the DB)


On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> One second
>
> The topic called transactions_processed is streaming through Spark.
> Transactions-created are created at the same time (the same timestamp) but
> you have NOT received them and they don't yet exist in your DB,
> because presumably your relational database is too slow to ingest them? you
> do a query in Postgres for say transaction_id 3 but they don't exist yet?
> When are they expected to arrive?
>
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ariev@gmail.com> wrote:
>
>> Thanks for the quick reply!
>>
>> I'm not sure I got the idea correctly... but from what I'm underding,
>> wouldn't that actually end the same way?
>> Because, this is the current scenario:
>>
>> *transactions-processed: *
>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>>
>> *transactions-created:*
>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>> 11:01:00" }
>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>> 12:02:00" }
>>
>> - So, when I fetch ALL messages from both topics, there are still 2x
>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
>> "transaction-created" yet (and they aren't in Postgres either)
>> - But since they were pulled by "Structured Streaming" already, they'll
>> be kinda marked as "processed" by Spark Structure Streaming checkpoint
>> anyway.
>>
>> And therefore, I can't replay/reprocess them again...
>>
>> Is my understanding correct? Am I missing something here?
>>
>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>>> Thanks for the details.
>>>
>>> Can you read these in the same app. For example. This is PySpark but it
>>> serves the purpose.
>>>
>>> Read topic "newtopic" in micro batch and the other topic "md" in another
>>> microbatch
>>>
>>>         try:
>>>             # process topic --> newtopic
>>>             streamingNewtopic = self.spark \
>>>                 .readStream \
>>>                 .format("kafka") \
>>>                 .option("kafka.bootstrap.servers",
>>> config['MDVariables']['bootstrapServers'],) \
>>>                 .option("schema.registry.url",
>>> config['MDVariables']['schemaRegistryURL']) \
>>>                 .option("group.id", config['common']['newtopic']) \
>>>                 .option("zookeeper.connection.timeout.ms",
>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>                 .option("rebalance.backoff.ms",
>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>                 .option("zookeeper.session.timeout.ms",
>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>                 .option("auto.commit.interval.ms",
>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>                 *.option("subscribe",
>>> config['MDVariables']['newtopic']) \*
>>>                 .option("failOnDataLoss", "false") \
>>>                 .option("includeHeaders", "true") \
>>>                 .option("startingOffsets", "latest") \
>>>                 .load() \
>>>                 .select(from_json(col("value").cast("string"),
>>> newtopicSchema).alias("newtopic_value"))
>>>
>>>             # construct a streaming dataframe streamingDataFrame that
>>> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>>>             streamingDataFrame = self.spark \
>>>                 .readStream \
>>>                 .format("kafka") \
>>>                 .option("kafka.bootstrap.servers",
>>> config['MDVariables']['bootstrapServers'],) \
>>>                 .option("schema.registry.url",
>>> config['MDVariables']['schemaRegistryURL']) \
>>>                 .option("group.id", config['common']['appName']) \
>>>                 .option("zookeeper.connection.timeout.ms",
>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>                 .option("rebalance.backoff.ms",
>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>                 .option("zookeeper.session.timeout.ms",
>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>                 .option("auto.commit.interval.ms",
>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>                 *.option("subscribe", config['MDVariables']['topic']) \*
>>>                 .option("failOnDataLoss", "false") \
>>>                 .option("includeHeaders", "true") \
>>>                 .option("startingOffsets", "latest") \
>>>                 .load() \
>>>                 .select(from_json(col("value").cast("string"),
>>> schema).alias("parsed_value"))
>>>
>>>
>>>             streamingNewtopic.printSchema()
>>>
>>>             # Now do a writeStream and call the relevant functions to
>>> process dataframes
>>>
>>>             newtopicResult = streamingNewtopic.select( \
>>>                      col("newtopic_value.uuid").alias("uuid") \
>>>                    ,
>>> col("newtopic_value.timeissued").alias("timeissued") \
>>>                    , col("newtopic_value.queue").alias("queue") \
>>>                    , col("newtopic_value.status").alias("status")). \
>>>                      writeStream. \
>>>                      outputMode('append'). \
>>>                      option("truncate", "false"). \
>>>   *                   foreachBatch(sendToControl). \*
>>>                      trigger(processingTime='2 seconds'). \
>>>                      queryName(config['MDVariables']['newtopic']). \
>>>                      start()
>>>
>>>             result = streamingDataFrame.select( \
>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>>                    , col("parsed_value.price").alias("price")). \
>>>                      writeStream. \
>>>                      outputMode('append'). \
>>>                      option("truncate", "false"). \
>>>                      *foreachBatch(sendToSink). \*
>>>                      trigger(processingTime='30 seconds'). \
>>>                      option('checkpointLocation', checkpoint_path). \
>>>                      queryName(config['MDVariables']['topic']). \
>>>                      start()
>>>             print(result)
>>>
>>>         except Exception as e:
>>>                 print(f"""{e}, quitting""")
>>>                 sys.exit(1)
>>>
>>> Inside that function say *sendToSink *you can get the df and batchId
>>>
>>> def sendToSink(df, batchId):
>>>     if(len(df.take(1))) > 0:
>>>         print(f"""md batchId is {batchId}""")
>>>         df.show(100,False)
>>>         df. persist()
>>>         # write to BigQuery batch table
>>>         s.writeTableToBQ(df, "append",
>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>         df.unpersist()
>>>         print(f"""wrote to DB""")
>>>     else:
>>>         print("DataFrame md is empty")
>>>
>>> And you have created DF from the other topic newtopic
>>>
>>> def sendToControl(dfnewtopic, batchId):
>>>     if(len(dfnewtopic.take(1))) > 0:
>>>         ......
>>>
>>> Now you have  two dataframe* df* and *dfnewtopic* in the same session.
>>> Will you be able to join these two dataframes through common key value?
>>>
>>> HTH
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ariev@gmail.com>
>>> wrote:
>>>
>>>> Hello! Sure thing!
>>>>
>>>> I'm reading them *separately*, both are apps written with Scala +
>>>> Spark Structured Streaming.
>>>>
>>>> I feel like I missed some details on my original thread (sorry it was
>>>> past 4 AM) and it was getting frustrating
>>>> Please let me try to clarify some points:
>>>>
>>>> *Transactions Created Consumer*
>>>> -----------------------------------
>>>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured
>>>> Streaming) ConsumerApp --->  Sinks to ---> Postgres DB Table
>>>> (Transactions)
>>>> -----------------------------------
>>>>
>>>> *Transactions Processed Consumer*
>>>> -------------------------------------
>>>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
>>>> -------------------------------------           2) Selects the Ids
>>>> -------------------------------------
>>>> |   Postgres / Trx table         |. <--- 3) Fetches the rows w/ the
>>>> matching ids that have status 'created (let's call it "b")
>>>> -------------------------------------         4)  Performs an
>>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" (but
>>>> now there's some "b_leftovers" that were out of the intersection)
>>>>                                                      5)  Sinks
>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
>>>> unprocessed (not persisted)
>>>>                                                      6) However, those
>>>> "b_leftovers" would, ultimately, be processed at some point (even if it
>>>> takes like 1-3 days) - when their corresponding transaction_id are
>>>>                                                          pushed to the
>>>> "trx-created-topic" Kafka topic, and are then processed by that first
>>>> consumer
>>>>
>>>> So, what I'm trying to accomplish is find a way to reprocess those
>>>> "b_leftovers" *without *having to restart the app
>>>> Does that make sense?
>>>>
>>>> PS: It doesn't necessarily have to be real streaming, if micro-batching
>>>> (legacy Spark Streaming) would allow such a thing, it would technically
>>>> work (although I keep hearing it's not advisable)
>>>>
>>>> Thank you so much!
>>>>
>>>> Kind regards
>>>>
>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Can you please clarify if you are reading these two topics separately
>>>>> or within the same scala or python script in Spark Structured Streaming?
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ariev@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello guys,
>>>>>>
>>>>>> I've been struggling with this for some days now, without success,
so
>>>>>> I would highly appreciate any enlightenment. The simplified scenario
is the
>>>>>> following:
>>>>>>
>>>>>>    - I've got 2 topics in Kafka (it's already like that in
>>>>>>    production, can't change it)
>>>>>>       - transactions-created,
>>>>>>       - transaction-processed
>>>>>>    - Even though the schema is not exactly the same, they all share
>>>>>>    a correlation_id, which is their "transaction_id"
>>>>>>
>>>>>> So, long story short, I've got 2 consumers, one for each topic, and
>>>>>> all I wanna do is sink them in a chain order. I'm writing them w/
Spark
>>>>>> Structured Streaming, btw
>>>>>>
>>>>>> So far so good, the caveat here is:
>>>>>>
>>>>>> - I cannot write a given "*processed" *transaction unless there is
>>>>>> an entry of that same transaction with the status "*created*".
>>>>>>
>>>>>> - There is *no* guarantee that any transactions in the topic
>>>>>> "transaction-*processed*" have a match (same transaction_id) in the
>>>>>> "transaction-*created*" at the moment the messages are fetched.
>>>>>>
>>>>>> So the workflow so far is:
>>>>>> - Msgs from the "transaction-created" just get synced to postgres,
no
>>>>>> questions asked
>>>>>>
>>>>>> - As for the "transaction-processed", it goes as follows:
>>>>>>
>>>>>>    - a) Messages are fetched from the Kafka topic
>>>>>>    - b) Select the transaction_id of those...
>>>>>>    - c) Fetch all the rows w/ the corresponding id from a Postgres
>>>>>>    table AND that have the status "CREATED"
>>>>>>    - d) Then, a pretty much do a intersection between the two
>>>>>>    datasets, and sink only on "processed" ones that have with step
c
>>>>>>    - e) Persist the resulting dataset
>>>>>>
>>>>>> But the rows (from the 'processed') that were not part of the
>>>>>> intersection get lost afterwards...
>>>>>>
>>>>>> So my question is:
>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting
>>>>>> the app?
>>>>>> - For this scenario, should I fall back to Spark Streaming, instead
>>>>>> of Structured Streaming?
>>>>>>
>>>>>> PS: I was playing around with Spark Streaming (legacy) and managed
to
>>>>>> commit only the ones in the microbatches that were fully successful
(still
>>>>>> failed to find a way to "poll" for the uncommitted ones without restarting,
>>>>>> though).
>>>>>>
>>>>>> Thank you very much in advance!
>>>>>>
>>>>>>

Mime
View raw message