spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sheel Pancholi <sheelst...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 16:47:25 GMT
Hello Russell and Stephen,

Thanks a lot.

This has been a lot of learning already. I did try it on my EC2 instance.
And went straight into the corresponding source code of Structured
Streaming.

Here is my understanding, and although I am writing the below points about
the base DF only, the same applies to all subsequent DFs in the DAG:

For the program:

// Create DataFrame representing the stream of input lines from
connection to localhost:9999val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
// Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))
// Generate running word countval wordCounts = words.groupBy("value").count()


*Observations:*
- To the user that is the one who is actually writing the code, it looks
like the *lines* DF is the same single DF being magically mutated as and
when the stream data keeps arriving i.e. the concept of the unbounded table.
- Deep down after the *WholeStageCodeGeneration*, its always the *StateStoreRDD
*+ *<current batch RDD>* *on each iteration* that is doing the trick. The
*StateStoreRDD *is the one that keeps the "relevant and needed"
state/history. The *<current batch RDD> *that is the *DataSourceRDD *is
reflective of the current batch of data that has arrived in the stream.

And, like I mentioned, this applies to all the DFs in the DAG for each
iteration. The point I was trying to drive home was that the *lines* DF
"actually results" in a *new DataSourceRDD* for every iteration/batch after
it goes through the Catalyst Optimizer and the user is abstracted away from
that through the *lines *DF.

Please confirm if my understanding is right. The visuals  from the UI
really helped. Please let me know if my inference is right.

Regards
Sheel

On Thu, May 16, 2019 at 9:34 PM Stephen Boesch <javadba@gmail.com> wrote:

> > kafka
> Is that true regardless of the outputMode even "complete" ?  i have been
> working with kafka to try to empirically understand the behavior but that
> is a less than ideal approach: knowing the *intent *is a better starting
> point. thx!
>
> Am Do., 16. Mai 2019 um 08:51 Uhr schrieb Russell Spitzer <
> russell.spitzer@gmail.com>:
>
>> Kafka only pulls from the last offset to the the most recent (or
>> throttled next point), so it would also only be pulling in a unique set of
>> data each batch. In case of failure, spark just re reads the batches offset
>> interval from kafka.
>>
>> On Thu, May 16, 2019 at 10:45 AM Stephen Boesch <javadba@gmail.com>
>> wrote:
>>
>>> > The exact source RDD (or rdds created) is determined by the Relation
>>> implementation of the datasource being read
>>>
>>> That is something I completely missed. In any case have been working
>>> exclusively with Kafka - which I believe (but will confirm using your
>>> suggestions to look at the GUI) do support retention across all batches.
>>> There are a lot of moving parts/variations of behavior in the SSS.
>>>
>>> Am Do., 16. Mai 2019 um 07:25 Uhr schrieb Russell Spitzer <
>>> russell.spitzer@gmail.com>:
>>>
>>>> The plan may be several RDDS and the State itself will be stored
>>>> separately, you can always check this by looking at the UI when running
>>>> your job. The Stage Tab will show you exactly what RDDs are created. The
>>>> exact source RDD (or rdds created) is determined by the Relation
>>>> implementation of the datasource being read. In the case of the socket
>>>> reader, it will only keep those entries for the batch window being
>>>> processed. So you will get 10 records for the first batch, and 5 for the
>>>> second. We can see all of this in the UI.
>>>>
>>>> I recommend actually booting up spark and doing this, here is a run I
>>>> just did in the spark shell in local mode.
>>>>
>>>> Each execution as the job runs is translated into a job with stages
>>>>
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> Inside our stage we can see the RDD's Created
>>>>
>>>> [image: image.png]
>>>>
>>>> The DataSource RDD represents the data being added to the groupBy
>>>>
>>>> Thus data is combined with our state store in the second stage
>>>>
>>>> [image: image.png]
>>>>
>>>> As we continue typing more data into netcat, the shuffle write (amount
>>>> of data passed into the grouping aggregation)
>>>>
>>>> [image: image.png]
>>>>
>>>> Is only equal to the amount of rows read in that batch. Notice how it
>>>> is empty when there are no words typed, and more bytes when there are more
>>>> words typed and less when there are less word.
>>>>
>>>> I encourage you to actually load this up and play around with the UI to
>>>> get a feel for it.
>>>>
>>>> On Thu, May 16, 2019 at 8:46 AM Sheel Pancholi <sheelstera@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Russell,
>>>>>
>>>>> Thank you so much again. That answers almost everything.
>>>>>
>>>>> I was fully aware of the example I was quoting. My interpretation of
>>>>> "unbounded" as I understood from the docs was: *"the DF for the
>>>>> stream acts an unconditionally unbounded table"* i.e. regardless of
>>>>> the type of operation (a transformation whether its a *map()* or a
>>>>> *reducebyKey())* or output mode intended upon the stream, the
>>>>> data/stream would continue to be accumulated like a transaction table.
With
>>>>> that interpretation in mind, there was a confusion with the statements
>>>>> where one statement said *~"the stream is treated like an unbounded
>>>>> table"* and on the other hand it said *"source data is discarded upon
>>>>> processing and only keeps around minimal intermediate data". *
>>>>>
>>>>> With your crisp explanation + the following diagram of the Catalyst
>>>>> Optimizer ...
>>>>> [image: image.png]
>>>>>
>>>>> ...can I infer the following *using the example of word count from
>>>>> the Spark Docs*?:
>>>>> 1. If at time instant T0, 10 records arrive in the stream, then the
>>>>> input table is of size 10 and correspondingly, the code generation phase
>>>>> will ultimately create a base RDD for 10 elements for the DF *lines*
>>>>> and those will be processed
>>>>> 2. At time instant T1, the stream gets another 5 records, and,
>>>>> subsequently, because of the two important features: *complete mode*
>>>>> and *groupBy() operation, *the complete state is preserved and
>>>>> therefore the input table is of size 15 and therefore, during the code
>>>>> generation phase, *a new base RDD for 15 elements* is generated for
>>>>> the DF *lines*.
>>>>>
>>>>> Please confirm.
>>>>>
>>>>> Regards
>>>>> Sheel
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <
>>>>> russell.spitzer@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> You are looking at the digram without looking at the underlying
>>>>>> request. The behavior of state collection is dependent on the request
and
>>>>>> the output mode of the query.
>>>>>>
>>>>>> In the example you cite
>>>>>>
>>>>>> val lines = spark.readStream
>>>>>>   .format("socket")
>>>>>>   .option("host", "localhost")
>>>>>>   .option("port", 9999)
>>>>>>   .load()
>>>>>> // Split the lines into wordsval words = lines.as[String].flatMap(_.split("
"))
>>>>>> // Generate running word countval wordCounts = words.groupBy("value").count()
>>>>>>
>>>>>> // Start running the query that prints the running counts to the
consoleval query = wordCounts.writeStream
>>>>>>   .outputMode("complete")
>>>>>>   .format("console")
>>>>>>   .start()
>>>>>> query.awaitTermination()
>>>>>>
>>>>>>
>>>>>> This code contains two important features which cause the whole state
>>>>>> to be held. First we have an aggregation. The groupBy operation is
un
>>>>>> bounded. This means an unlimited amount of state will be required
to be
>>>>>> stored as the application proceeds. In addition the output mode is
>>>>>> "complete". This means on every batch iteration the entire set of
output
>>>>>> results must be returned.
>>>>>>
>>>>>> Digram is just showing which records are processed by the request
at
>>>>>> each time, along with the previous data that was required to build
the
>>>>>> state.
>>>>>>
>>>>>> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <sheelstera@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Along with what I sent before, I want to add that I went over
the
>>>>>>> documentation at
>>>>>>> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>>>>>>>
>>>>>>>
>>>>>>> Here is an excerpt:
>>>>>>>
>>>>>>> [image: Model]
>>>>>>>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>>>>>>>
>>>>>>>> Note that Structured Streaming does not materialize the entire
table.
>>>>>>>> It reads the latest available data from the streaming data
source,
>>>>>>>> processes it incrementally to update the result, and then
discards the
>>>>>>>> source data. It only keeps around the minimal intermediate
*state* data
>>>>>>>> as required to update the result (e.g. intermediate counts
in the earlier
>>>>>>>> example).
>>>>>>>>
>>>>>>> My question is: on one hand, the diagram shows the input table
to
>>>>>>> truly be unbounded by constantly letting the data arrive into
this "table".
>>>>>>> But, on the other hand, it also says, that it discards the source
data.
>>>>>>> Then, what is the meaning of the unbounded table in the diagram
above
>>>>>>> showing incremental data arriving and sitting in this unbounded
input
>>>>>>> table!  Moreover, it also says that it keeps the intermediate
data only
>>>>>>> (i.e. the intermediate counts). This is kind of sounding contradictory
in
>>>>>>> my head.
>>>>>>>
>>>>>>> Could you please clarify what is it ultimately supposed to be?
>>>>>>>
>>>>>>> Regards
>>>>>>> Sheel
>>>>>>>
>>>>>>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <sheelstera@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Russell,
>>>>>>>>
>>>>>>>> Thanks for clarifying. I went over the Catalyst Optimizer
Deep Dive
>>>>>>>> video at https://www.youtube.com/watch?v=RmUn5vHlevc and
that
>>>>>>>> along with your explanation made me realize that the the
DataFrame is the
>>>>>>>> new DStream in Structured Streaming. If my understanding
is correct,
>>>>>>>> request you to clarify the 2 points below:
>>>>>>>>
>>>>>>>> 1. Incremental Query - Say at time instant T1 you have 10
items to
>>>>>>>> process, and then at time instant T2 you have 5 newer items
streaming in to
>>>>>>>> be processed. *Structured Streaming* says that the DF is
treated
>>>>>>>> as an unbounded table and hence 15 items will be processed
together. Does
>>>>>>>> this mean that on Iteration 1 (i.e. time instant T1) the
Catalyst Optimizer
>>>>>>>> in the code generation phase creates an RDD of 10 elements
and on Iteration
>>>>>>>> 2 ( i.e. time instant T2 ), the Catalyst Optimizer creates
an RDD of 15
>>>>>>>> elements?
>>>>>>>> 2. You mentioned *"Some parts of the plan refer to static
pieces
>>>>>>>> of data  ..."*  Could you elaborate a bit more on what does
this
>>>>>>>> static piece of data refer to? Are you referring to the 10
records that had
>>>>>>>> already arrived at T1 and are now sitting as old static data
in the
>>>>>>>> unbounded table?
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> Sheel
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Dataframes describe the calculation to be done, but the
underlying
>>>>>>>>> implementation is an "Incremental Query". That is that
the dataframe code
>>>>>>>>> is executed repeatedly with Catalyst adjusting the final
execution plan on
>>>>>>>>> each run. Some parts of the plan refer to static pieces
of data, others
>>>>>>>>> refer to data which is pulled in on each iteration. None
of this changes
>>>>>>>>> the DataFrame objects themselves.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <
>>>>>>>>> sheelstera@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>> Structured Streaming treats a stream as an unbounded
table in the
>>>>>>>>>> form of a DataFrame. Continuously flowing data from
the stream keeps
>>>>>>>>>> getting added to this DataFrame (which is the unbounded
table) which
>>>>>>>>>> warrants a change to the DataFrame which violates
the vary basic nature of
>>>>>>>>>> a DataFrame since a DataFrame by its nature is immutable.
This sounds
>>>>>>>>>> contradictory. Is there an explanation for this?
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> Sheel
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>>
>>>>>>>> Sheel Pancholi
>>>>>>>>
>>>>>>>> *Mob: +91 9620474620*
>>>>>>>>
>>>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>>>
>>>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com>
|
>>>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows
Live
>>>>>>>> <sheelstera@live.com>*
>>>>>>>>
>>>>>>>> P *Save a tree* - please do not print this email unless you
really
>>>>>>>> need to!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Sheel Pancholi
>>>>>>>
>>>>>>> *Mob: +91 9620474620*
>>>>>>>
>>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>>
>>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com>
|
>>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>>>> <sheelstera@live.com>*
>>>>>>>
>>>>>>> P *Save a tree* - please do not print this email unless you really
>>>>>>> need to!
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Sheel Pancholi
>>>>>
>>>>> *Mob: +91 9620474620*
>>>>>
>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> |
>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>
>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> |
>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>> <sheelstera@live.com>*
>>>>>
>>>>> P *Save a tree* - please do not print this email unless you really
>>>>> need to!
>>>>>
>>>>

-- 

Best Regards,

Sheel Pancholi

*Mob: +91 9620474620*

*Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
<http://in.linkedin.com/in/sheelstera>*

*Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
<sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*

P *Save a tree* - please do not print this email unless you really need to!

Mime
View raw message