spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sheel Pancholi <sheelst...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 13:45:45 GMT
Hello Russell,

Thank you so much again. That answers almost everything.

I was fully aware of the example I was quoting. My interpretation of
"unbounded" as I understood from the docs was: *"the DF for the stream acts
an unconditionally unbounded table"* i.e. regardless of the type of
operation (a transformation whether its a *map()* or a *reducebyKey())* or
output mode intended upon the stream, the data/stream would continue to be
accumulated like a transaction table. With that interpretation in mind,
there was a confusion with the statements where one statement said *~"the
stream is treated like an unbounded table"* and on the other hand it
said *"source
data is discarded upon processing and only keeps around minimal
intermediate data". *

With your crisp explanation + the following diagram of the Catalyst
Optimizer ...
[image: image.png]

...can I infer the following *using the example of word count from the
Spark Docs*?:
1. If at time instant T0, 10 records arrive in the stream, then the input
table is of size 10 and correspondingly, the code generation phase will
ultimately create a base RDD for 10 elements for the DF *lines* and those
will be processed
2. At time instant T1, the stream gets another 5 records, and,
subsequently, because of the two important features: *complete mode*
and *groupBy()
operation, *the complete state is preserved and therefore the input table
is of size 15 and therefore, during the code generation phase, *a new base
RDD for 15 elements* is generated for the DF *lines*.

Please confirm.

Regards
Sheel



On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <russell.spitzer@gmail.com>
wrote:

>
> You are looking at the digram without looking at the underlying request.
> The behavior of state collection is dependent on the request and the output
> mode of the query.
>
> In the example you cite
>
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", 9999)
>   .load()
> // Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))
> // Generate running word countval wordCounts = words.groupBy("value").count()
>
> // Start running the query that prints the running counts to the consoleval query = wordCounts.writeStream
>   .outputMode("complete")
>   .format("console")
>   .start()
> query.awaitTermination()
>
>
> This code contains two important features which cause the whole state to
> be held. First we have an aggregation. The groupBy operation is un bounded.
> This means an unlimited amount of state will be required to be stored as
> the application proceeds. In addition the output mode is "complete". This
> means on every batch iteration the entire set of output results must be
> returned.
>
> Digram is just showing which records are processed by the request at each
> time, along with the previous data that was required to build the state.
>
> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <sheelstera@gmail.com>
> wrote:
>
>> Hello,
>>
>> Along with what I sent before, I want to add that I went over the
>> documentation at
>> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>>
>>
>> Here is an excerpt:
>>
>> [image: Model]
>>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>>
>>> Note that Structured Streaming does not materialize the entire table.
>>> It reads the latest available data from the streaming data source,
>>> processes it incrementally to update the result, and then discards the
>>> source data. It only keeps around the minimal intermediate *state* data
>>> as required to update the result (e.g. intermediate counts in the earlier
>>> example).
>>>
>> My question is: on one hand, the diagram shows the input table to truly
>> be unbounded by constantly letting the data arrive into this "table". But,
>> on the other hand, it also says, that it discards the source data. Then,
>> what is the meaning of the unbounded table in the diagram above showing
>> incremental data arriving and sitting in this unbounded input table!
>> Moreover, it also says that it keeps the intermediate data only (i.e. the
>> intermediate counts). This is kind of sounding contradictory in my head.
>>
>> Could you please clarify what is it ultimately supposed to be?
>>
>> Regards
>> Sheel
>>
>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <sheelstera@gmail.com>
>> wrote:
>>
>>> Hello Russell,
>>>
>>> Thanks for clarifying. I went over the Catalyst Optimizer Deep Dive
>>> video at https://www.youtube.com/watch?v=RmUn5vHlevc and that along
>>> with your explanation made me realize that the the DataFrame is the new
>>> DStream in Structured Streaming. If my understanding is correct, request
>>> you to clarify the 2 points below:
>>>
>>> 1. Incremental Query - Say at time instant T1 you have 10 items to
>>> process, and then at time instant T2 you have 5 newer items streaming in to
>>> be processed. *Structured Streaming* says that the DF is treated as an
>>> unbounded table and hence 15 items will be processed together. Does this
>>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst Optimizer in
>>> the code generation phase creates an RDD of 10 elements and on Iteration 2
>>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD of 15
>>> elements?
>>> 2. You mentioned *"Some parts of the plan refer to static pieces of
>>> data  ..."*  Could you elaborate a bit more on what does this static
>>> piece of data refer to? Are you referring to the 10 records that had
>>> already arrived at T1 and are now sitting as old static data in the
>>> unbounded table?
>>>
>>> Regards
>>> Sheel
>>>
>>>
>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>> russell.spitzer@gmail.com> wrote:
>>>
>>>> Dataframes describe the calculation to be done, but the underlying
>>>> implementation is an "Incremental Query". That is that the dataframe code
>>>> is executed repeatedly with Catalyst adjusting the final execution plan on
>>>> each run. Some parts of the plan refer to static pieces of data, others
>>>> refer to data which is pulled in on each iteration. None of this changes
>>>> the DataFrame objects themselves.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <sheelstera@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>> Structured Streaming treats a stream as an unbounded table in the form
>>>>> of a DataFrame. Continuously flowing data from the stream keeps getting
>>>>> added to this DataFrame (which is the unbounded table) which warrants
a
>>>>> change to the DataFrame which violates the vary basic nature of a DataFrame
>>>>> since a DataFrame by its nature is immutable. This sounds contradictory.
Is
>>>>> there an explanation for this?
>>>>>
>>>>> Regards
>>>>> Sheel
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best Regards,
>>>
>>> Sheel Pancholi
>>>
>>> *Mob: +91 9620474620*
>>>
>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
>>> <http://in.linkedin.com/in/sheelstera>*
>>>
>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>>
>>> P *Save a tree* - please do not print this email unless you really need
>>> to!
>>>
>>
>>
>> --
>>
>> Best Regards,
>>
>> Sheel Pancholi
>>
>> *Mob: +91 9620474620*
>>
>> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
>> <http://in.linkedin.com/in/sheelstera>*
>>
>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>
>> P *Save a tree* - please do not print this email unless you really need
>> to!
>>
>

-- 

Best Regards,

Sheel Pancholi

*Mob: +91 9620474620*

*Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
<http://in.linkedin.com/in/sheelstera>*

*Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
<sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*

P *Save a tree* - please do not print this email unless you really need to!

Mime
View raw message