spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Beabes <mailinglist...@gmail.com>
Subject Re: Stream which needs to be “joined” with another Stream of “Reference” data.
Date Mon, 03 May 2021 18:48:43 GMT
I was looking for something like "Processing Time Temporal Join" in Flink
as described here:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-join


On Mon, May 3, 2021 at 11:07 AM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> You are welcome Yuri. However, I stand corrected :)
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> ‪On Mon, 3 May 2021 at 19:02, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎
<
> yurkao@gmail.com> wrote:‬
>
>> Always nice to learn something new about jdbc.
>> Thanks, Mich **thumbsup**
>>
>>
>> On 3 May 2021, at 20:54, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>> 
>> i would have assumed that reference data like device_id are pretty static
>> so a snapshot will do.
>>
>> JDBC connection is lazy so it will not materialise until the join uses
>> it. Then data will be collected from the underlying RDBMS table for
>> COMMITED transactions
>>
>> However, this is something that I discussed in another thread
>>
>> *Spark Streaming with Files*
>>
>> There is an option that one can trigger once
>>
>>               result = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.ticker").alias("ticker") \
>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>                    , col("parsed_value.price").alias("price")). \
>>                      writeStream. \
>>                      outputMode('append'). \
>>                      option("truncate", "false"). \
>>                      foreachBatch(sendToSink). \
>>                      queryName('trailFiles'). \
>>                     * trigger(once = True). \*
>>  *                    option('checkpointLocation', checkpoint_path). \*
>>                      start(data_path)
>>
>> This means that the streaming job will run for all data connected and
>> terminate. In that case JDBC connection will be refreshed according to your
>> batch interval that restarts the streaming process for unprocessed data and
>> critically your JDBC snapshot will be updated as read
>>
>> This can be done through airflow etc. You won't lose data as the
>> checkpoint will mark processed records.
>>
>> That might be an option.
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎
<
>> yurkao@gmail.com> wrote:‬
>>
>>> You can do the enrichment with stream(events)-static(device table) join
>>> when the device table is slow changing dimension (let’s say once a day
>>> change) and it’s in delta format, then for every micro batch with
>>> stream-static John the device table will be rescanned and up to date device
>>> data will be loaded.
>>>
>>> If device table is not slow dimension(once an hour change), then you’d
>>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in
>>> Spark supports streaming mode.
>>> So I’d better sync jdbc with parquet/delta periodically in order to
>>> emulate streaming source
>>>
>>>
>>> On 3 May 2021, at 20:02, Eric Beabes <mailinglists19@gmail.com> wrote:
>>>
>>> 
>>> 1) Device_id might be different for messages in a batch.
>>> 2) It's a Streaming application. The IOT messages are getting read in a
>>> Structured Streaming job in a "Stream". The Dataframe would need to be
>>> updated every hour. Have you done something similar in the past? Do you
>>> have an example to share?
>>>
>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Can you please clarify:
>>>>
>>>>
>>>>    1. The IOT messages in one batch have the same device_id or every
>>>>    row has different device_id?
>>>>    2. The RDBMS table can be read through JDBC in Spark and a
>>>>    dataframe can be created on. Does that work for you? You do not really
need
>>>>    to stream the reference table.
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <mailinglists19@gmail.com>
>>>> wrote:
>>>>
>>>>> I would like to develop a Spark Structured Streaming job that reads
>>>>> messages in a Stream which needs to be “joined” with another Stream
of
>>>>> “Reference” data.
>>>>>
>>>>> For example, let’s say I’m reading messages from Kafka coming in
from
>>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a
DEVICE
>>>>> table on a relational database. What I need to do is “join” the ‘device_id’
>>>>> in the message with the ‘device_id’ on the table to enrich the incoming
>>>>> message. Somewhere I read that, this can be done by joining two streams.
I
>>>>> guess, we can create a “Stream” that reads the DEVICE table once
every hour
>>>>> or so.
>>>>>
>>>>> Questions:
>>>>> 1) Is this the right way to solve this use case?
>>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State
>>>>> timeout set to an hour?
>>>>> 3) What would happen while the DEVICE state is getting updated from
>>>>> the table on the relational database?
>>>>>
>>>>> Guidance would be greatly appreciated. Thanks.
>>>>>
>>>>

Mime
View raw message