spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephen Boesch <java...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 17:26:46 GMT
The way you describe the RDD's seems to be  "yes /immutable if it's a
deterministic source" and "otherwise no".    This affects non SSS RDD's as
well and leads to confusion.   For example I'm still not clear on how to
generate a random number based column  and then ensure the same result
comes up in later sql invocations : the way I've tried is to persist the
RDD (e.g. to s3 or hdfs) and then read it fresh into a new one from the
persistent store: anything memory based does change each time, cached or
not.

In the case of Kafka/SSS my assumption is that the offsets maintained by
the internal consumer managed by the SSS are used to pull the latest
records from kafka . If the mode is "complete" then the aggregates are
computed across all prior reads from kafka as well as the current one.




Am Do., 16. Mai 2019 um 10:13 Uhr schrieb Russell Spitzer <
russell.spitzer@gmail.com>:

> The dataframe is never mutated. You are thinking as if the Dataframe as
> information inside of it, but it's not that. A dataframe (and RDD) are
> descriptions of how a computation should be done. In structured streaming,
> dataframe execution produces a different query execution depending on the
> iteration number and current state. That's why you see a new RDD every
> time, since each execution will correspond to a different series of RDDs
> which interact.
>
> But even though the RDD's are also immutable, their underlying
> implementation can cause different data to be drawn depending on when they
> are executed (this is again source dependent). I could for example write a
> source which produces random data. Although my RDD would be immutable every
> execution would produce different items. This is relevant when your RDDs
> are references to files or databases whose underlying state may change
> between executions. In these cases, even though the RDD is the same and can
> be replayed, the end result may not be identical.
>
> On Thu, May 16, 2019 at 11:48 AM Sheel Pancholi <sheelstera@gmail.com>
> wrote:
>
>> Hello Russell and Stephen,
>>
>> Thanks a lot.
>>
>> This has been a lot of learning already. I did try it on my EC2 instance.
>> And went straight into the corresponding source code of Structured
>> Streaming.
>>
>> Here is my understanding, and although I am writing the below points
>> about the base DF only, the same applies to all subsequent DFs in the DAG:
>>
>> For the program:
>>
>> // Create DataFrame representing the stream of input lines from connection to localhost:9999val
lines = spark.readStream
>>   .format("socket")
>>   .option("host", "localhost")
>>   .option("port", 9999)
>>   .load()
>> // Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))
>> // Generate running word countval wordCounts = words.groupBy("value").count()
>>
>>
>> *Observations:*
>> - To the user that is the one who is actually writing the code, it looks
>> like the *lines* DF is the same single DF being magically mutated as and
>> when the stream data keeps arriving i.e. the concept of the unbounded table.
>> - Deep down after the *WholeStageCodeGeneration*, its always the *StateStoreRDD
>> *+ *<current batch RDD>* *on each iteration* that is doing the trick.
>> The *StateStoreRDD *is the one that keeps the "relevant and needed"
>> state/history. The *<current batch RDD> *that is the *DataSourceRDD *is
>> reflective of the current batch of data that has arrived in the stream.
>>
>> And, like I mentioned, this applies to all the DFs in the DAG for each
>> iteration. The point I was trying to drive home was that the *lines* DF
>> "actually results" in a *new DataSourceRDD* for every iteration/batch
>> after it goes through the Catalyst Optimizer and the user is abstracted
>> away from that through the *lines *DF.
>>
>> Please confirm if my understanding is right. The visuals  from the UI
>> really helped. Please let me know if my inference is right.
>>
>> Regards
>> Sheel
>>
>> On Thu, May 16, 2019 at 9:34 PM Stephen Boesch <javadba@gmail.com> wrote:
>>
>>> > kafka
>>> Is that true regardless of the outputMode even "complete" ?  i have been
>>> working with kafka to try to empirically understand the behavior but that
>>> is a less than ideal approach: knowing the *intent *is a better
>>> starting point. thx!
>>>
>>> Am Do., 16. Mai 2019 um 08:51 Uhr schrieb Russell Spitzer <
>>> russell.spitzer@gmail.com>:
>>>
>>>> Kafka only pulls from the last offset to the the most recent (or
>>>> throttled next point), so it would also only be pulling in a unique set of
>>>> data each batch. In case of failure, spark just re reads the batches offset
>>>> interval from kafka.
>>>>
>>>> On Thu, May 16, 2019 at 10:45 AM Stephen Boesch <javadba@gmail.com>
>>>> wrote:
>>>>
>>>>> > The exact source RDD (or rdds created) is determined by the Relation
>>>>> implementation of the datasource being read
>>>>>
>>>>> That is something I completely missed. In any case have been working
>>>>> exclusively with Kafka - which I believe (but will confirm using your
>>>>> suggestions to look at the GUI) do support retention across all batches.
>>>>> There are a lot of moving parts/variations of behavior in the SSS.
>>>>>
>>>>> Am Do., 16. Mai 2019 um 07:25 Uhr schrieb Russell Spitzer <
>>>>> russell.spitzer@gmail.com>:
>>>>>
>>>>>> The plan may be several RDDS and the State itself will be stored
>>>>>> separately, you can always check this by looking at the UI when running
>>>>>> your job. The Stage Tab will show you exactly what RDDs are created.
The
>>>>>> exact source RDD (or rdds created) is determined by the Relation
>>>>>> implementation of the datasource being read. In the case of the socket
>>>>>> reader, it will only keep those entries for the batch window being
>>>>>> processed. So you will get 10 records for the first batch, and 5
for the
>>>>>> second. We can see all of this in the UI.
>>>>>>
>>>>>> I recommend actually booting up spark and doing this, here is a run
I
>>>>>> just did in the spark shell in local mode.
>>>>>>
>>>>>> Each execution as the job runs is translated into a job with stages
>>>>>>
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>>
>>>>>> Inside our stage we can see the RDD's Created
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> The DataSource RDD represents the data being added to the groupBy
>>>>>>
>>>>>> Thus data is combined with our state store in the second stage
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> As we continue typing more data into netcat, the shuffle write
>>>>>> (amount of data passed into the grouping aggregation)
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> Is only equal to the amount of rows read in that batch. Notice how
it
>>>>>> is empty when there are no words typed, and more bytes when there
are more
>>>>>> words typed and less when there are less word.
>>>>>>
>>>>>> I encourage you to actually load this up and play around with the
UI
>>>>>> to get a feel for it.
>>>>>>
>>>>>> On Thu, May 16, 2019 at 8:46 AM Sheel Pancholi <sheelstera@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Russell,
>>>>>>>
>>>>>>> Thank you so much again. That answers almost everything.
>>>>>>>
>>>>>>> I was fully aware of the example I was quoting. My interpretation
of
>>>>>>> "unbounded" as I understood from the docs was: *"the DF for the
>>>>>>> stream acts an unconditionally unbounded table"* i.e. regardless
of
>>>>>>> the type of operation (a transformation whether its a *map()*
or a
>>>>>>> *reducebyKey())* or output mode intended upon the stream, the
>>>>>>> data/stream would continue to be accumulated like a transaction
table. With
>>>>>>> that interpretation in mind, there was a confusion with the statements
>>>>>>> where one statement said *~"the stream is treated like an unbounded
>>>>>>> table"* and on the other hand it said *"source data is discarded
>>>>>>> upon processing and only keeps around minimal intermediate data".
*
>>>>>>>
>>>>>>> With your crisp explanation + the following diagram of the Catalyst
>>>>>>> Optimizer ...
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> ...can I infer the following *using the example of word count
from
>>>>>>> the Spark Docs*?:
>>>>>>> 1. If at time instant T0, 10 records arrive in the stream, then
the
>>>>>>> input table is of size 10 and correspondingly, the code generation
phase
>>>>>>> will ultimately create a base RDD for 10 elements for the DF
*lines*
>>>>>>> and those will be processed
>>>>>>> 2. At time instant T1, the stream gets another 5 records, and,
>>>>>>> subsequently, because of the two important features: *complete
mode*
>>>>>>> and *groupBy() operation, *the complete state is preserved and
>>>>>>> therefore the input table is of size 15 and therefore, during
the code
>>>>>>> generation phase, *a new base RDD for 15 elements* is generated
for
>>>>>>> the DF *lines*.
>>>>>>>
>>>>>>> Please confirm.
>>>>>>>
>>>>>>> Regards
>>>>>>> Sheel
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <
>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> You are looking at the digram without looking at the underlying
>>>>>>>> request. The behavior of state collection is dependent on
the request and
>>>>>>>> the output mode of the query.
>>>>>>>>
>>>>>>>> In the example you cite
>>>>>>>>
>>>>>>>> val lines = spark.readStream
>>>>>>>>   .format("socket")
>>>>>>>>   .option("host", "localhost")
>>>>>>>>   .option("port", 9999)
>>>>>>>>   .load()
>>>>>>>> // Split the lines into wordsval words = lines.as[String].flatMap(_.split("
"))
>>>>>>>> // Generate running word countval wordCounts = words.groupBy("value").count()
>>>>>>>>
>>>>>>>> // Start running the query that prints the running counts
to the consoleval query = wordCounts.writeStream
>>>>>>>>   .outputMode("complete")
>>>>>>>>   .format("console")
>>>>>>>>   .start()
>>>>>>>> query.awaitTermination()
>>>>>>>>
>>>>>>>>
>>>>>>>> This code contains two important features which cause the
whole
>>>>>>>> state to be held. First we have an aggregation. The groupBy
operation is un
>>>>>>>> bounded. This means an unlimited amount of state will be
required to be
>>>>>>>> stored as the application proceeds. In addition the output
mode is
>>>>>>>> "complete". This means on every batch iteration the entire
set of output
>>>>>>>> results must be returned.
>>>>>>>>
>>>>>>>> Digram is just showing which records are processed by the
request
>>>>>>>> at each time, along with the previous data that was required
to build the
>>>>>>>> state.
>>>>>>>>
>>>>>>>> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <
>>>>>>>> sheelstera@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> Along with what I sent before, I want to add that I went
over the
>>>>>>>>> documentation at
>>>>>>>>> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Here is an excerpt:
>>>>>>>>>
>>>>>>>>> [image: Model]
>>>>>>>>>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>>>>>>>>>
>>>>>>>>>> Note that Structured Streaming does not materialize
the entire
>>>>>>>>>> table. It reads the latest available data from the
streaming
>>>>>>>>>> data source, processes it incrementally to update
the result, and then
>>>>>>>>>> discards the source data. It only keeps around the
minimal intermediate
>>>>>>>>>> *state* data as required to update the result (e.g.
intermediate
>>>>>>>>>> counts in the earlier example).
>>>>>>>>>>
>>>>>>>>> My question is: on one hand, the diagram shows the input
table to
>>>>>>>>> truly be unbounded by constantly letting the data arrive
into this "table".
>>>>>>>>> But, on the other hand, it also says, that it discards
the source data.
>>>>>>>>> Then, what is the meaning of the unbounded table in the
diagram above
>>>>>>>>> showing incremental data arriving and sitting in this
unbounded input
>>>>>>>>> table!  Moreover, it also says that it keeps the intermediate
data only
>>>>>>>>> (i.e. the intermediate counts). This is kind of sounding
contradictory in
>>>>>>>>> my head.
>>>>>>>>>
>>>>>>>>> Could you please clarify what is it ultimately supposed
to be?
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> Sheel
>>>>>>>>>
>>>>>>>>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <
>>>>>>>>> sheelstera@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Russell,
>>>>>>>>>>
>>>>>>>>>> Thanks for clarifying. I went over the Catalyst Optimizer
Deep
>>>>>>>>>> Dive video at https://www.youtube.com/watch?v=RmUn5vHlevc
and
>>>>>>>>>> that along with your explanation made me realize
that the the DataFrame is
>>>>>>>>>> the new DStream in Structured Streaming. If my understanding
is correct,
>>>>>>>>>> request you to clarify the 2 points below:
>>>>>>>>>>
>>>>>>>>>> 1. Incremental Query - Say at time instant T1 you
have 10 items
>>>>>>>>>> to process, and then at time instant T2 you have
5 newer items streaming in
>>>>>>>>>> to be processed. *Structured Streaming* says that
the DF is
>>>>>>>>>> treated as an unbounded table and hence 15 items
will be processed
>>>>>>>>>> together. Does this mean that on Iteration 1 (i.e.
time instant T1) the
>>>>>>>>>> Catalyst Optimizer in the code generation phase creates
an RDD of 10
>>>>>>>>>> elements and on Iteration 2 ( i.e. time instant T2
), the Catalyst
>>>>>>>>>> Optimizer creates an RDD of 15 elements?
>>>>>>>>>> 2. You mentioned *"Some parts of the plan refer to
static pieces
>>>>>>>>>> of data  ..."*  Could you elaborate a bit more on
what does this
>>>>>>>>>> static piece of data refer to? Are you referring
to the 10 records that had
>>>>>>>>>> already arrived at T1 and are now sitting as old
static data in the
>>>>>>>>>> unbounded table?
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> Sheel
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>>>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Dataframes describe the calculation to be done,
but the
>>>>>>>>>>> underlying implementation is an "Incremental
Query". That is that the
>>>>>>>>>>> dataframe code is executed repeatedly with Catalyst
adjusting the final
>>>>>>>>>>> execution plan on each run. Some parts of the
plan refer to static pieces
>>>>>>>>>>> of data, others refer to data which is pulled
in on each iteration. None of
>>>>>>>>>>> this changes the DataFrame objects themselves.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi
<
>>>>>>>>>>> sheelstera@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>> Structured Streaming treats a stream as an
unbounded table in
>>>>>>>>>>>> the form of a DataFrame. Continuously flowing
data from the stream keeps
>>>>>>>>>>>> getting added to this DataFrame (which is
the unbounded table) which
>>>>>>>>>>>> warrants a change to the DataFrame which
violates the vary basic nature of
>>>>>>>>>>>> a DataFrame since a DataFrame by its nature
is immutable. This sounds
>>>>>>>>>>>> contradictory. Is there an explanation for
this?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Sheel
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Best Regards,
>>>>>>>>>>
>>>>>>>>>> Sheel Pancholi
>>>>>>>>>>
>>>>>>>>>> *Mob: +91 9620474620*
>>>>>>>>>>
>>>>>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>>>>>
>>>>>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com>
|
>>>>>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows
Live
>>>>>>>>>> <sheelstera@live.com>*
>>>>>>>>>>
>>>>>>>>>> P *Save a tree* - please do not print this email
unless you
>>>>>>>>>> really need to!
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>>
>>>>>>>>> Sheel Pancholi
>>>>>>>>>
>>>>>>>>> *Mob: +91 9620474620*
>>>>>>>>>
>>>>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>>>>
>>>>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com>
|
>>>>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows
Live
>>>>>>>>> <sheelstera@live.com>*
>>>>>>>>>
>>>>>>>>> P *Save a tree* - please do not print this email unless
you
>>>>>>>>> really need to!
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Sheel Pancholi
>>>>>>>
>>>>>>> *Mob: +91 9620474620*
>>>>>>>
>>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>>
>>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com>
|
>>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>>>> <sheelstera@live.com>*
>>>>>>>
>>>>>>> P *Save a tree* - please do not print this email unless you really
>>>>>>> need to!
>>>>>>>
>>>>>>
>>
>> --
>>
>> Best Regards,
>>
>> Sheel Pancholi
>>
>> *Mob: +91 9620474620*
>>
>> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
>> <http://in.linkedin.com/in/sheelstera>*
>>
>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>
>> P *Save a tree* - please do not print this email unless you really need
>> to!
>>
>

Mime
View raw message