spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Spitzer <russell.spit...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 13:03:35 GMT
You are looking at the digram without looking at the underlying request.
The behavior of state collection is dependent on the request and the output
mode of the query.

In the example you cite

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
// Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))
// Generate running word countval wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the
consoleval query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()


This code contains two important features which cause the whole state to be
held. First we have an aggregation. The groupBy operation is un bounded.
This means an unlimited amount of state will be required to be stored as
the application proceeds. In addition the output mode is "complete". This
means on every batch iteration the entire set of output results must be
returned.

Digram is just showing which records are processed by the request at each
time, along with the previous data that was required to build the state.

On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <sheelstera@gmail.com> wrote:

> Hello,
>
> Along with what I sent before, I want to add that I went over the
> documentation at
> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>
>
> Here is an excerpt:
>
> [image: Model]
>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>
>> Note that Structured Streaming does not materialize the entire table. It
>> reads the latest available data from the streaming data source, processes
>> it incrementally to update the result, and then discards the source data.
>> It only keeps around the minimal intermediate *state* data as required
>> to update the result (e.g. intermediate counts in the earlier example).
>>
> My question is: on one hand, the diagram shows the input table to truly be
> unbounded by constantly letting the data arrive into this "table". But, on
> the other hand, it also says, that it discards the source data. Then, what
> is the meaning of the unbounded table in the diagram above showing
> incremental data arriving and sitting in this unbounded input table!
> Moreover, it also says that it keeps the intermediate data only (i.e. the
> intermediate counts). This is kind of sounding contradictory in my head.
>
> Could you please clarify what is it ultimately supposed to be?
>
> Regards
> Sheel
>
> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <sheelstera@gmail.com>
> wrote:
>
>> Hello Russell,
>>
>> Thanks for clarifying. I went over the Catalyst Optimizer Deep Dive video
>> at https://www.youtube.com/watch?v=RmUn5vHlevc and that along with your
>> explanation made me realize that the the DataFrame is the new DStream in
>> Structured Streaming. If my understanding is correct, request you to
>> clarify the 2 points below:
>>
>> 1. Incremental Query - Say at time instant T1 you have 10 items to
>> process, and then at time instant T2 you have 5 newer items streaming in to
>> be processed. *Structured Streaming* says that the DF is treated as an
>> unbounded table and hence 15 items will be processed together. Does this
>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst Optimizer in
>> the code generation phase creates an RDD of 10 elements and on Iteration 2
>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD of 15
>> elements?
>> 2. You mentioned *"Some parts of the plan refer to static pieces of
>> data  ..."*  Could you elaborate a bit more on what does this static
>> piece of data refer to? Are you referring to the 10 records that had
>> already arrived at T1 and are now sitting as old static data in the
>> unbounded table?
>>
>> Regards
>> Sheel
>>
>>
>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>> russell.spitzer@gmail.com> wrote:
>>
>>> Dataframes describe the calculation to be done, but the underlying
>>> implementation is an "Incremental Query". That is that the dataframe code
>>> is executed repeatedly with Catalyst adjusting the final execution plan on
>>> each run. Some parts of the plan refer to static pieces of data, others
>>> refer to data which is pulled in on each iteration. None of this changes
>>> the DataFrame objects themselves.
>>>
>>>
>>>
>>>
>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <sheelstera@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>> Structured Streaming treats a stream as an unbounded table in the form
>>>> of a DataFrame. Continuously flowing data from the stream keeps getting
>>>> added to this DataFrame (which is the unbounded table) which warrants a
>>>> change to the DataFrame which violates the vary basic nature of a DataFrame
>>>> since a DataFrame by its nature is immutable. This sounds contradictory.
Is
>>>> there an explanation for this?
>>>>
>>>> Regards
>>>> Sheel
>>>>
>>>
>>
>> --
>>
>> Best Regards,
>>
>> Sheel Pancholi
>>
>> *Mob: +91 9620474620*
>>
>> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
>> <http://in.linkedin.com/in/sheelstera>*
>>
>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>
>> P *Save a tree* - please do not print this email unless you really need
>> to!
>>
>
>
> --
>
> Best Regards,
>
> Sheel Pancholi
>
> *Mob: +91 9620474620*
>
> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
> <http://in.linkedin.com/in/sheelstera>*
>
> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>
> P *Save a tree* - please do not print this email unless you really need
> to!
>

Mime
View raw message