spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian Piu <sebastian....@gmail.com>
Subject Re: How to generate unique incrementing identifier in a structured streaming dataframe
Date Tue, 13 Jul 2021 20:59:52 GMT
If you want them to survive across jobs you can use snowflake IDs or
similar ideas depending on your use case

On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, <mich.talebzadeh@gmail.com>
wrote:

> Meaning as a monolithically incrementing ID as in Oracle sequence for each
> record read from Kafka. adding that to your dataframe?
>
> If you do Structured Structured Streaming in microbatch mode, you will get
> what is known as BatchId
>
>            result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      *foreachBatch(sendToSink). \*
>                      trigger(processingTime='30 seconds'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName(config['MDVariables']['topic']). \
>
> That function sendToSink will introduce two variables df and batchId
>
> def *sendToSink(df, batchId):*
>     if(len(df.take(1))) > 0:
>         print(f"""md batchId is {batchId}""")
>         df.show(100,False)
>         df. persist()
>         # write to BigQuery batch table
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>         df.unpersist()
>         print(f"""wrote to DB""")
>     else:
>         print("DataFrame md is empty")
>
> That value batchId can be used for each Batch.
>
>
> Otherwise you can do this
>
>
> startval = 1
> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
> felixkizhakkeljose@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Spark Structured Streaming to sink data from Kafka to AWS S3.
>> I am wondering if its possible for me to introduce a uniquely incrementing
>> identifier for each record as we do in RDBMS (incrementing long id)?
>> This would greatly benefit to range prune while reading based on this ID.
>>
>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>> like its not deterministic and it wont ensure new records gets next id from
>> the latest id what  is already present in the storage (S3)
>>
>> Regards,
>> Felix K Jose
>>
>

Mime
View raw message