spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Oliveira <bruno.ar...@gmail.com>
Subject Re: [Spark Structured Streaming] retry/replay failed messages
Date Fri, 09 Jul 2021 18:12:01 GMT
Thanks for the quick reply!

I'm not sure I got the idea correctly... but from what I'm underding,
wouldn't that actually end the same way?
Because, this is the current scenario:

*transactions-processed: *
{ "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
{ "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
{ "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
{ "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }

*transactions-created:*
{ "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
}
{ "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
12:02:00" }

- So, when I fetch ALL messages from both topics, there are still 2x
transactions (id: "*3*" and "*4*") which do *not* exist in the topic
"transaction-created" yet (and they aren't in Postgres either)
- But since they were pulled by "Structured Streaming" already, they'll be
kinda marked as "processed" by Spark Structure Streaming checkpoint anyway.

And therefore, I can't replay/reprocess them again...

Is my understanding correct? Am I missing something here?

On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Thanks for the details.
>
> Can you read these in the same app. For example. This is PySpark but it
> serves the purpose.
>
> Read topic "newtopic" in micro batch and the other topic "md" in another
> microbatch
>
>         try:
>             # process topic --> newtopic
>             streamingNewtopic = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['newtopic']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 *.option("subscribe", config['MDVariables']['newtopic'])
> \*
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> newtopicSchema).alias("newtopic_value"))
>
>             # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 *.option("subscribe", config['MDVariables']['topic']) \*
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
>             streamingNewtopic.printSchema()
>
>             # Now do a writeStream and call the relevant functions to
> process dataframes
>
>             newtopicResult = streamingNewtopic.select( \
>                      col("newtopic_value.uuid").alias("uuid") \
>                    , col("newtopic_value.timeissued").alias("timeissued") \
>                    , col("newtopic_value.queue").alias("queue") \
>                    , col("newtopic_value.status").alias("status")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>   *                   foreachBatch(sendToControl). \*
>                      trigger(processingTime='2 seconds'). \
>                      queryName(config['MDVariables']['newtopic']). \
>                      start()
>
>             result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      *foreachBatch(sendToSink). \*
>                      trigger(processingTime='30 seconds'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName(config['MDVariables']['topic']). \
>                      start()
>             print(result)
>
>         except Exception as e:
>                 print(f"""{e}, quitting""")
>                 sys.exit(1)
>
> Inside that function say *sendToSink *you can get the df and batchId
>
> def sendToSink(df, batchId):
>     if(len(df.take(1))) > 0:
>         print(f"""md batchId is {batchId}""")
>         df.show(100,False)
>         df. persist()
>         # write to BigQuery batch table
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>         df.unpersist()
>         print(f"""wrote to DB""")
>     else:
>         print("DataFrame md is empty")
>
> And you have created DF from the other topic newtopic
>
> def sendToControl(dfnewtopic, batchId):
>     if(len(dfnewtopic.take(1))) > 0:
>         ......
>
> Now you have  two dataframe* df* and *dfnewtopic* in the same session.
> Will you be able to join these two dataframes through common key value?
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira <bruno.ariev@gmail.com> wrote:
>
>> Hello! Sure thing!
>>
>> I'm reading them *separately*, both are apps written with Scala + Spark
>> Structured Streaming.
>>
>> I feel like I missed some details on my original thread (sorry it was
>> past 4 AM) and it was getting frustrating
>> Please let me try to clarify some points:
>>
>> *Transactions Created Consumer*
>> -----------------------------------
>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured Streaming)
>> ConsumerApp --->  Sinks to ---> Postgres DB Table (Transactions)
>> -----------------------------------
>>
>> *Transactions Processed Consumer*
>> -------------------------------------
>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
>> -------------------------------------           2) Selects the Ids
>> -------------------------------------
>> |   Postgres / Trx table         |. <--- 3) Fetches the rows w/ the
>> matching ids that have status 'created (let's call it "b")
>> -------------------------------------         4)  Performs an
>> intersection between "a" and "b" resulting in a "b_that_needs_sinking" (but
>> now there's some "b_leftovers" that were out of the intersection)
>>                                                      5)  Sinks
>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
>> unprocessed (not persisted)
>>                                                      6) However, those
>> "b_leftovers" would, ultimately, be processed at some point (even if it
>> takes like 1-3 days) - when their corresponding transaction_id are
>>                                                          pushed to the
>> "trx-created-topic" Kafka topic, and are then processed by that first
>> consumer
>>
>> So, what I'm trying to accomplish is find a way to reprocess those
>> "b_leftovers" *without *having to restart the app
>> Does that make sense?
>>
>> PS: It doesn't necessarily have to be real streaming, if micro-batching
>> (legacy Spark Streaming) would allow such a thing, it would technically
>> work (although I keep hearing it's not advisable)
>>
>> Thank you so much!
>>
>> Kind regards
>>
>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Can you please clarify if you are reading these two topics separately or
>>> within the same scala or python script in Spark Structured Streaming?
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <bruno.ariev@gmail.com>
>>> wrote:
>>>
>>>> Hello guys,
>>>>
>>>> I've been struggling with this for some days now, without success, so I
>>>> would highly appreciate any enlightenment. The simplified scenario is the
>>>> following:
>>>>
>>>>    - I've got 2 topics in Kafka (it's already like that in production,
>>>>    can't change it)
>>>>       - transactions-created,
>>>>       - transaction-processed
>>>>    - Even though the schema is not exactly the same, they all share a
>>>>    correlation_id, which is their "transaction_id"
>>>>
>>>> So, long story short, I've got 2 consumers, one for each topic, and all
>>>> I wanna do is sink them in a chain order. I'm writing them w/ Spark
>>>> Structured Streaming, btw
>>>>
>>>> So far so good, the caveat here is:
>>>>
>>>> - I cannot write a given "*processed" *transaction unless there is an
>>>> entry of that same transaction with the status "*created*".
>>>>
>>>> - There is *no* guarantee that any transactions in the topic
>>>> "transaction-*processed*" have a match (same transaction_id) in the
>>>> "transaction-*created*" at the moment the messages are fetched.
>>>>
>>>> So the workflow so far is:
>>>> - Msgs from the "transaction-created" just get synced to postgres, no
>>>> questions asked
>>>>
>>>> - As for the "transaction-processed", it goes as follows:
>>>>
>>>>    - a) Messages are fetched from the Kafka topic
>>>>    - b) Select the transaction_id of those...
>>>>    - c) Fetch all the rows w/ the corresponding id from a Postgres
>>>>    table AND that have the status "CREATED"
>>>>    - d) Then, a pretty much do a intersection between the two
>>>>    datasets, and sink only on "processed" ones that have with step c
>>>>    - e) Persist the resulting dataset
>>>>
>>>> But the rows (from the 'processed') that were not part of the
>>>> intersection get lost afterwards...
>>>>
>>>> So my question is:
>>>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting
>>>> the app?
>>>> - For this scenario, should I fall back to Spark Streaming, instead of
>>>> Structured Streaming?
>>>>
>>>> PS: I was playing around with Spark Streaming (legacy) and managed to
>>>> commit only the ones in the microbatches that were fully successful (still
>>>> failed to find a way to "poll" for the uncommitted ones without restarting,
>>>> though).
>>>>
>>>> Thank you very much in advance!
>>>>
>>>>

Mime
View raw message