spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephen Boesch <java...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 16:04:52 GMT
> kafka
Is that true regardless of the outputMode even "complete" ?  i have been
working with kafka to try to empirically understand the behavior but that
is a less than ideal approach: knowing the *intent *is a better starting
point. thx!

Am Do., 16. Mai 2019 um 08:51 Uhr schrieb Russell Spitzer <
russell.spitzer@gmail.com>:

> Kafka only pulls from the last offset to the the most recent (or throttled
> next point), so it would also only be pulling in a unique set of data each
> batch. In case of failure, spark just re reads the batches offset interval
> from kafka.
>
> On Thu, May 16, 2019 at 10:45 AM Stephen Boesch <javadba@gmail.com> wrote:
>
>> > The exact source RDD (or rdds created) is determined by the Relation
>> implementation of the datasource being read
>>
>> That is something I completely missed. In any case have been working
>> exclusively with Kafka - which I believe (but will confirm using your
>> suggestions to look at the GUI) do support retention across all batches.
>> There are a lot of moving parts/variations of behavior in the SSS.
>>
>> Am Do., 16. Mai 2019 um 07:25 Uhr schrieb Russell Spitzer <
>> russell.spitzer@gmail.com>:
>>
>>> The plan may be several RDDS and the State itself will be stored
>>> separately, you can always check this by looking at the UI when running
>>> your job. The Stage Tab will show you exactly what RDDs are created. The
>>> exact source RDD (or rdds created) is determined by the Relation
>>> implementation of the datasource being read. In the case of the socket
>>> reader, it will only keep those entries for the batch window being
>>> processed. So you will get 10 records for the first batch, and 5 for the
>>> second. We can see all of this in the UI.
>>>
>>> I recommend actually booting up spark and doing this, here is a run I
>>> just did in the spark shell in local mode.
>>>
>>> Each execution as the job runs is translated into a job with stages
>>>
>>>
>>> [image: image.png]
>>>
>>>
>>> Inside our stage we can see the RDD's Created
>>>
>>> [image: image.png]
>>>
>>> The DataSource RDD represents the data being added to the groupBy
>>>
>>> Thus data is combined with our state store in the second stage
>>>
>>> [image: image.png]
>>>
>>> As we continue typing more data into netcat, the shuffle write (amount
>>> of data passed into the grouping aggregation)
>>>
>>> [image: image.png]
>>>
>>> Is only equal to the amount of rows read in that batch. Notice how it is
>>> empty when there are no words typed, and more bytes when there are more
>>> words typed and less when there are less word.
>>>
>>> I encourage you to actually load this up and play around with the UI to
>>> get a feel for it.
>>>
>>> On Thu, May 16, 2019 at 8:46 AM Sheel Pancholi <sheelstera@gmail.com>
>>> wrote:
>>>
>>>> Hello Russell,
>>>>
>>>> Thank you so much again. That answers almost everything.
>>>>
>>>> I was fully aware of the example I was quoting. My interpretation of
>>>> "unbounded" as I understood from the docs was: *"the DF for the stream
>>>> acts an unconditionally unbounded table"* i.e. regardless of the type
>>>> of operation (a transformation whether its a *map()* or a
>>>> *reducebyKey())* or output mode intended upon the stream, the
>>>> data/stream would continue to be accumulated like a transaction table. With
>>>> that interpretation in mind, there was a confusion with the statements
>>>> where one statement said *~"the stream is treated like an unbounded
>>>> table"* and on the other hand it said *"source data is discarded upon
>>>> processing and only keeps around minimal intermediate data". *
>>>>
>>>> With your crisp explanation + the following diagram of the Catalyst
>>>> Optimizer ...
>>>> [image: image.png]
>>>>
>>>> ...can I infer the following *using the example of word count from the
>>>> Spark Docs*?:
>>>> 1. If at time instant T0, 10 records arrive in the stream, then the
>>>> input table is of size 10 and correspondingly, the code generation phase
>>>> will ultimately create a base RDD for 10 elements for the DF *lines*
>>>> and those will be processed
>>>> 2. At time instant T1, the stream gets another 5 records, and,
>>>> subsequently, because of the two important features: *complete mode*
>>>> and *groupBy() operation, *the complete state is preserved and
>>>> therefore the input table is of size 15 and therefore, during the code
>>>> generation phase, *a new base RDD for 15 elements* is generated for
>>>> the DF *lines*.
>>>>
>>>> Please confirm.
>>>>
>>>> Regards
>>>> Sheel
>>>>
>>>>
>>>>
>>>> On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <
>>>> russell.spitzer@gmail.com> wrote:
>>>>
>>>>>
>>>>> You are looking at the digram without looking at the underlying
>>>>> request. The behavior of state collection is dependent on the request
and
>>>>> the output mode of the query.
>>>>>
>>>>> In the example you cite
>>>>>
>>>>> val lines = spark.readStream
>>>>>   .format("socket")
>>>>>   .option("host", "localhost")
>>>>>   .option("port", 9999)
>>>>>   .load()
>>>>> // Split the lines into wordsval words = lines.as[String].flatMap(_.split("
"))
>>>>> // Generate running word countval wordCounts = words.groupBy("value").count()
>>>>>
>>>>> // Start running the query that prints the running counts to the consoleval
query = wordCounts.writeStream
>>>>>   .outputMode("complete")
>>>>>   .format("console")
>>>>>   .start()
>>>>> query.awaitTermination()
>>>>>
>>>>>
>>>>> This code contains two important features which cause the whole state
>>>>> to be held. First we have an aggregation. The groupBy operation is un
>>>>> bounded. This means an unlimited amount of state will be required to
be
>>>>> stored as the application proceeds. In addition the output mode is
>>>>> "complete". This means on every batch iteration the entire set of output
>>>>> results must be returned.
>>>>>
>>>>> Digram is just showing which records are processed by the request at
>>>>> each time, along with the previous data that was required to build the
>>>>> state.
>>>>>
>>>>> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <sheelstera@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Along with what I sent before, I want to add that I went over the
>>>>>> documentation at
>>>>>> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>>>>>>
>>>>>>
>>>>>> Here is an excerpt:
>>>>>>
>>>>>> [image: Model]
>>>>>>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>>>>>>
>>>>>>> Note that Structured Streaming does not materialize the entire
table.
>>>>>>> It reads the latest available data from the streaming data source,
>>>>>>> processes it incrementally to update the result, and then discards
the
>>>>>>> source data. It only keeps around the minimal intermediate *state*
data
>>>>>>> as required to update the result (e.g. intermediate counts in
the earlier
>>>>>>> example).
>>>>>>>
>>>>>> My question is: on one hand, the diagram shows the input table to
>>>>>> truly be unbounded by constantly letting the data arrive into this
"table".
>>>>>> But, on the other hand, it also says, that it discards the source
data.
>>>>>> Then, what is the meaning of the unbounded table in the diagram above
>>>>>> showing incremental data arriving and sitting in this unbounded input
>>>>>> table!  Moreover, it also says that it keeps the intermediate data
only
>>>>>> (i.e. the intermediate counts). This is kind of sounding contradictory
in
>>>>>> my head.
>>>>>>
>>>>>> Could you please clarify what is it ultimately supposed to be?
>>>>>>
>>>>>> Regards
>>>>>> Sheel
>>>>>>
>>>>>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <sheelstera@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Russell,
>>>>>>>
>>>>>>> Thanks for clarifying. I went over the Catalyst Optimizer Deep
Dive
>>>>>>> video at https://www.youtube.com/watch?v=RmUn5vHlevc and that
along
>>>>>>> with your explanation made me realize that the the DataFrame
is the new
>>>>>>> DStream in Structured Streaming. If my understanding is correct,
request
>>>>>>> you to clarify the 2 points below:
>>>>>>>
>>>>>>> 1. Incremental Query - Say at time instant T1 you have 10 items
to
>>>>>>> process, and then at time instant T2 you have 5 newer items streaming
in to
>>>>>>> be processed. *Structured Streaming* says that the DF is treated
as
>>>>>>> an unbounded table and hence 15 items will be processed together.
Does this
>>>>>>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst
Optimizer in
>>>>>>> the code generation phase creates an RDD of 10 elements and on
Iteration 2
>>>>>>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD
of 15
>>>>>>> elements?
>>>>>>> 2. You mentioned *"Some parts of the plan refer to static pieces
of
>>>>>>> data  ..."*  Could you elaborate a bit more on what does this
>>>>>>> static piece of data refer to? Are you referring to the 10 records
that had
>>>>>>> already arrived at T1 and are now sitting as old static data
in the
>>>>>>> unbounded table?
>>>>>>>
>>>>>>> Regards
>>>>>>> Sheel
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>
>>>>>>>> Dataframes describe the calculation to be done, but the underlying
>>>>>>>> implementation is an "Incremental Query". That is that the
dataframe code
>>>>>>>> is executed repeatedly with Catalyst adjusting the final
execution plan on
>>>>>>>> each run. Some parts of the plan refer to static pieces of
data, others
>>>>>>>> refer to data which is pulled in on each iteration. None
of this changes
>>>>>>>> the DataFrame objects themselves.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <
>>>>>>>> sheelstera@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>> Structured Streaming treats a stream as an unbounded
table in the
>>>>>>>>> form of a DataFrame. Continuously flowing data from the
stream keeps
>>>>>>>>> getting added to this DataFrame (which is the unbounded
table) which
>>>>>>>>> warrants a change to the DataFrame which violates the
vary basic nature of
>>>>>>>>> a DataFrame since a DataFrame by its nature is immutable.
This sounds
>>>>>>>>> contradictory. Is there an explanation for this?
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> Sheel
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Sheel Pancholi
>>>>>>>
>>>>>>> *Mob: +91 9620474620*
>>>>>>>
>>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>>
>>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com>
|
>>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>>>> <sheelstera@live.com>*
>>>>>>>
>>>>>>> P *Save a tree* - please do not print this email unless you really
>>>>>>> need to!
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Sheel Pancholi
>>>>>>
>>>>>> *Mob: +91 9620474620*
>>>>>>
>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>
>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> |
>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>>> <sheelstera@live.com>*
>>>>>>
>>>>>> P *Save a tree* - please do not print this email unless you really
>>>>>> need to!
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Best Regards,
>>>>
>>>> Sheel Pancholi
>>>>
>>>> *Mob: +91 9620474620*
>>>>
>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> |
>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>
>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>>>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>>>
>>>> P *Save a tree* - please do not print this email unless you really
>>>> need to!
>>>>
>>>

Mime
View raw message