spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: [Spark Structured Streaming] retry/replay failed messages
Date Fri, 09 Jul 2021 19:51:39 GMT
One second

The topic called transactions_processed is streaming through Spark.
Transactions-created are created at the same time (the same timestamp) but
you have NOT received them and they don't yet exist in your DB,
because presumably your relational database is too slow to ingest them? you
do a query in Postgres for say transaction_id 3 but they don't exist yet?
When are they expected to arrive?




   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira <bruno.ariev@gmail.com> wrote:

> Thanks for the quick reply!
>
> I'm not sure I got the idea correctly... but from what I'm underding,
> wouldn't that actually end the same way?
> Because, this is the current scenario:
>
> *transactions-processed: *
> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>
> *transactions-created:*
> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
> }
> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
> 12:02:00" }
>
> - So, when I fetch ALL messages from both topics, there are still 2x
> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
> "transaction-created" yet (and they aren't in Postgres either)
> - But since they were pulled by "Structured Streaming" already, they'll be
> kinda marked as "processed" by Spark Structure Streaming checkpoint anyway.
>
> And therefore, I can't replay/reprocess them again...
>
> Is my understanding correct? Am I missing something here?
>
> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> Thanks for the details.
>>
>> Can you read these in the same app. For example. This is PySpark but it
>> serves the purpose.
>>
>> Read topic "newtopic" in micro batch and the other topic "md" in another
>> microbatch
>>
>>         try:
>>             # process topic --> newtopic
>>             streamingNewtopic = self.spark \
>>                 .readStream \
>>                 .format("kafka") \
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>                 .option("group.id", config['common']['newtopic']) \
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>                 *.option("subscribe", config['MDVariables']['newtopic'])
>> \*
>>                 .option("failOnDataLoss", "false") \
>>                 .option("includeHeaders", "true") \
>>                 .option("startingOffsets", "latest") \
>>                 .load() \
>>                 .select(from_json(col("value").cast("string"),
>> newtopicSchema).alias("newtopic_value"))
>>
>>             # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>>             streamingDataFrame = self.spark \
>>                 .readStream \
>>                 .format("kafka") \
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>                 .option("group.id", config['common']['appName']) \
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>                 *.option("subscribe", config['MDVariables']['topic']) \*
>>                 .option("failOnDataLoss", "false") \
>>                 .option("includeHeaders", "true") \
>>                 .option("startingOffsets", "latest") \
>>                 .load() \
>>                 .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>>             streamingNewtopic.printSchema()
>>
>>             # Now do a writeStream and call the relevant functions to
>> process dataframes
>>
>>             newtopicResult = streamingNewtopic.select( \
>>                      col("newtopic_value.uuid").alias("uuid") \
>>                    , col("newtopic_value.timeissued").alias("timeissued")
>> \
>>                    , col("newtopic_value.queue").alias("queue") \
>>                    , col("newtopic_value.status").alias("status")). \
>>                      writeStream. \
>>                      outputMode('append'). \
>>                      option("truncate", "false"). \
>>   *                   foreachBatch(sendToControl). \*
>>                      trigger(processingTime='2 seconds'). \
>>                      queryName(config['MDVariables']['newtopic']). \
>>                      start()
>>
>>             result = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.ticker").alias("ticker") \
>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>                    , col("parsed_value.price").alias("price")). \
>>                      writeStream. \
>>                      outputMode('append'). \
>>                      option("truncate", "false"). \
>>                      *foreachBatch(sendToSink). \*
>>                      trigger(processingTime='30 seconds'). \
>>                      option('checkpointLocation', checkpoint_path). \
>>                      queryName(config['MDVariables']['topic']). \
>>                      start()
>>             print(result)
>>
>>         except Exception as e:
>>                 print(f"""{e}, quitting""")
>>                 sys.exit(1)
>>
>> Inside that function say *sendToSink *you can get the df and batchId
>>
>> def sendToSink(df, batchId):
>>     if(len(df.take(1))) > 0:
>>         print(f"""md batchId is {batchId}""")
>>         df.show(100,False)
>>         df. persist()
>>         # write to BigQuery batch table
>>         s.writeTableToBQ(df, "append",
>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>         df.unpersist()
>>         print(f"""wrote to DB""")
>>     else:
>>         print("DataFrame md is empty")
>>
>> And you have created DF from the other topic newtopic
>>
>> def sendToControl(dfnewtopic, batchId):
>>     if(len(dfnewtopic.take(1))) > 0:
>>         ......
>>
>> Now you have  two dataframe* df* and *dfnewtopic* in the same session.
>> Will you be able to join these two dataframes through common key value?
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ariev@gmail.com>
>> wrote:
>>
>>> Hello! Sure thing!
>>>
>>> I'm reading them *separately*, both are apps written with Scala + Spark
>>> Structured Streaming.
>>>
>>> I feel like I missed some details on my original thread (sorry it was
>>> past 4 AM) and it was getting frustrating
>>> Please let me try to clarify some points:
>>>
>>> *Transactions Created Consumer*
>>> -----------------------------------
>>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured Streaming)
>>> ConsumerApp --->  Sinks to ---> Postgres DB Table (Transactions)
>>> -----------------------------------
>>>
>>> *Transactions Processed Consumer*
>>> -------------------------------------
>>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
>>> -------------------------------------           2) Selects the Ids
>>> -------------------------------------
>>> |   Postgres / Trx table         |. <--- 3) Fetches the rows w/ the
>>> matching ids that have status 'created (let's call it "b")
>>> -------------------------------------         4)  Performs an
>>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" (but
>>> now there's some "b_leftovers" that were out of the intersection)
>>>                                                      5)  Sinks
>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
>>> unprocessed (not persisted)
>>>                                                      6) However, those
>>> "b_leftovers" would, ultimately, be processed at some point (even if it
>>> takes like 1-3 days) - when their corresponding transaction_id are
>>>                                                          pushed to the
>>> "trx-created-topic" Kafka topic, and are then processed by that first
>>> consumer
>>>
>>> So, what I'm trying to accomplish is find a way to reprocess those
>>> "b_leftovers" *without *having to restart the app
>>> Does that make sense?
>>>
>>> PS: It doesn't necessarily have to be real streaming, if micro-batching
>>> (legacy Spark Streaming) would allow such a thing, it would technically
>>> work (although I keep hearing it's not advisable)
>>>
>>> Thank you so much!
>>>
>>> Kind regards
>>>
>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Can you please clarify if you are reading these two topics separately
>>>> or within the same scala or python script in Spark Structured Streaming?
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ariev@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello guys,
>>>>>
>>>>> I've been struggling with this for some days now, without success, so
>>>>> I would highly appreciate any enlightenment. The simplified scenario
is the
>>>>> following:
>>>>>
>>>>>    - I've got 2 topics in Kafka (it's already like that in
>>>>>    production, can't change it)
>>>>>       - transactions-created,
>>>>>       - transaction-processed
>>>>>    - Even though the schema is not exactly the same, they all share a
>>>>>    correlation_id, which is their "transaction_id"
>>>>>
>>>>> So, long story short, I've got 2 consumers, one for each topic, and
>>>>> all I wanna do is sink them in a chain order. I'm writing them w/ Spark
>>>>> Structured Streaming, btw
>>>>>
>>>>> So far so good, the caveat here is:
>>>>>
>>>>> - I cannot write a given "*processed" *transaction unless there is an
>>>>> entry of that same transaction with the status "*created*".
>>>>>
>>>>> - There is *no* guarantee that any transactions in the topic
>>>>> "transaction-*processed*" have a match (same transaction_id) in the
>>>>> "transaction-*created*" at the moment the messages are fetched.
>>>>>
>>>>> So the workflow so far is:
>>>>> - Msgs from the "transaction-created" just get synced to postgres, no
>>>>> questions asked
>>>>>
>>>>> - As for the "transaction-processed", it goes as follows:
>>>>>
>>>>>    - a) Messages are fetched from the Kafka topic
>>>>>    - b) Select the transaction_id of those...
>>>>>    - c) Fetch all the rows w/ the corresponding id from a Postgres
>>>>>    table AND that have the status "CREATED"
>>>>>    - d) Then, a pretty much do a intersection between the two
>>>>>    datasets, and sink only on "processed" ones that have with step c
>>>>>    - e) Persist the resulting dataset
>>>>>
>>>>> But the rows (from the 'processed') that were not part of the
>>>>> intersection get lost afterwards...
>>>>>
>>>>> So my question is:
>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting
>>>>> the app?
>>>>> - For this scenario, should I fall back to Spark Streaming, instead of
>>>>> Structured Streaming?
>>>>>
>>>>> PS: I was playing around with Spark Streaming (legacy) and managed to
>>>>> commit only the ones in the microbatches that were fully successful (still
>>>>> failed to find a way to "poll" for the uncommitted ones without restarting,
>>>>> though).
>>>>>
>>>>> Thank you very much in advance!
>>>>>
>>>>>

Mime
View raw message