spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephen Boesch <java...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 15:46:10 GMT
> The exact source RDD (or rdds created) is determined by the Relation
implementation of the datasource being read

That is something I completely missed. In any case have been working
exclusively with Kafka - which I believe (but will confirm using your
suggestions to look at the GUI) do support retention across all batches.
There are a lot of moving parts/variations of behavior in the SSS.

Am Do., 16. Mai 2019 um 07:25 Uhr schrieb Russell Spitzer <
russell.spitzer@gmail.com>:

> The plan may be several RDDS and the State itself will be stored
> separately, you can always check this by looking at the UI when running
> your job. The Stage Tab will show you exactly what RDDs are created. The
> exact source RDD (or rdds created) is determined by the Relation
> implementation of the datasource being read. In the case of the socket
> reader, it will only keep those entries for the batch window being
> processed. So you will get 10 records for the first batch, and 5 for the
> second. We can see all of this in the UI.
>
> I recommend actually booting up spark and doing this, here is a run I just
> did in the spark shell in local mode.
>
> Each execution as the job runs is translated into a job with stages
>
>
> [image: image.png]
>
>
> Inside our stage we can see the RDD's Created
>
> [image: image.png]
>
> The DataSource RDD represents the data being added to the groupBy
>
> Thus data is combined with our state store in the second stage
>
> [image: image.png]
>
> As we continue typing more data into netcat, the shuffle write (amount of
> data passed into the grouping aggregation)
>
> [image: image.png]
>
> Is only equal to the amount of rows read in that batch. Notice how it is
> empty when there are no words typed, and more bytes when there are more
> words typed and less when there are less word.
>
> I encourage you to actually load this up and play around with the UI to
> get a feel for it.
>
> On Thu, May 16, 2019 at 8:46 AM Sheel Pancholi <sheelstera@gmail.com>
> wrote:
>
>> Hello Russell,
>>
>> Thank you so much again. That answers almost everything.
>>
>> I was fully aware of the example I was quoting. My interpretation of
>> "unbounded" as I understood from the docs was: *"the DF for the stream
>> acts an unconditionally unbounded table"* i.e. regardless of the type of
>> operation (a transformation whether its a *map()* or a *reducebyKey())* or
>> output mode intended upon the stream, the data/stream would continue to be
>> accumulated like a transaction table. With that interpretation in mind,
>> there was a confusion with the statements where one statement said *~"the
>> stream is treated like an unbounded table"* and on the other hand it
>> said *"source data is discarded upon processing and only keeps around
>> minimal intermediate data". *
>>
>> With your crisp explanation + the following diagram of the Catalyst
>> Optimizer ...
>> [image: image.png]
>>
>> ...can I infer the following *using the example of word count from the
>> Spark Docs*?:
>> 1. If at time instant T0, 10 records arrive in the stream, then the input
>> table is of size 10 and correspondingly, the code generation phase will
>> ultimately create a base RDD for 10 elements for the DF *lines* and
>> those will be processed
>> 2. At time instant T1, the stream gets another 5 records, and,
>> subsequently, because of the two important features: *complete mode* and *groupBy()
>> operation, *the complete state is preserved and therefore the input
>> table is of size 15 and therefore, during the code generation phase, *a
>> new base RDD for 15 elements* is generated for the DF *lines*.
>>
>> Please confirm.
>>
>> Regards
>> Sheel
>>
>>
>>
>> On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <
>> russell.spitzer@gmail.com> wrote:
>>
>>>
>>> You are looking at the digram without looking at the underlying request.
>>> The behavior of state collection is dependent on the request and the output
>>> mode of the query.
>>>
>>> In the example you cite
>>>
>>> val lines = spark.readStream
>>>   .format("socket")
>>>   .option("host", "localhost")
>>>   .option("port", 9999)
>>>   .load()
>>> // Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))
>>> // Generate running word countval wordCounts = words.groupBy("value").count()
>>>
>>> // Start running the query that prints the running counts to the consoleval query
= wordCounts.writeStream
>>>   .outputMode("complete")
>>>   .format("console")
>>>   .start()
>>> query.awaitTermination()
>>>
>>>
>>> This code contains two important features which cause the whole state to
>>> be held. First we have an aggregation. The groupBy operation is un bounded.
>>> This means an unlimited amount of state will be required to be stored as
>>> the application proceeds. In addition the output mode is "complete". This
>>> means on every batch iteration the entire set of output results must be
>>> returned.
>>>
>>> Digram is just showing which records are processed by the request at
>>> each time, along with the previous data that was required to build the
>>> state.
>>>
>>> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <sheelstera@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Along with what I sent before, I want to add that I went over the
>>>> documentation at
>>>> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>>>>
>>>>
>>>> Here is an excerpt:
>>>>
>>>> [image: Model]
>>>>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>>>>
>>>>> Note that Structured Streaming does not materialize the entire table.
>>>>> It reads the latest available data from the streaming data source,
>>>>> processes it incrementally to update the result, and then discards the
>>>>> source data. It only keeps around the minimal intermediate *state* data
>>>>> as required to update the result (e.g. intermediate counts in the earlier
>>>>> example).
>>>>>
>>>> My question is: on one hand, the diagram shows the input table to truly
>>>> be unbounded by constantly letting the data arrive into this "table". But,
>>>> on the other hand, it also says, that it discards the source data. Then,
>>>> what is the meaning of the unbounded table in the diagram above showing
>>>> incremental data arriving and sitting in this unbounded input table!
>>>> Moreover, it also says that it keeps the intermediate data only (i.e. the
>>>> intermediate counts). This is kind of sounding contradictory in my head.
>>>>
>>>> Could you please clarify what is it ultimately supposed to be?
>>>>
>>>> Regards
>>>> Sheel
>>>>
>>>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <sheelstera@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Russell,
>>>>>
>>>>> Thanks for clarifying. I went over the Catalyst Optimizer Deep Dive
>>>>> video at https://www.youtube.com/watch?v=RmUn5vHlevc and that along
>>>>> with your explanation made me realize that the the DataFrame is the new
>>>>> DStream in Structured Streaming. If my understanding is correct, request
>>>>> you to clarify the 2 points below:
>>>>>
>>>>> 1. Incremental Query - Say at time instant T1 you have 10 items to
>>>>> process, and then at time instant T2 you have 5 newer items streaming
in to
>>>>> be processed. *Structured Streaming* says that the DF is treated as
>>>>> an unbounded table and hence 15 items will be processed together. Does
this
>>>>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst Optimizer
in
>>>>> the code generation phase creates an RDD of 10 elements and on Iteration
2
>>>>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD of 15
>>>>> elements?
>>>>> 2. You mentioned *"Some parts of the plan refer to static pieces of
>>>>> data  ..."*  Could you elaborate a bit more on what does this static
>>>>> piece of data refer to? Are you referring to the 10 records that had
>>>>> already arrived at T1 and are now sitting as old static data in the
>>>>> unbounded table?
>>>>>
>>>>> Regards
>>>>> Sheel
>>>>>
>>>>>
>>>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>>>> russell.spitzer@gmail.com> wrote:
>>>>>
>>>>>> Dataframes describe the calculation to be done, but the underlying
>>>>>> implementation is an "Incremental Query". That is that the dataframe
code
>>>>>> is executed repeatedly with Catalyst adjusting the final execution
plan on
>>>>>> each run. Some parts of the plan refer to static pieces of data,
others
>>>>>> refer to data which is pulled in on each iteration. None of this
changes
>>>>>> the DataFrame objects themselves.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <sheelstera@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>> Structured Streaming treats a stream as an unbounded table in
the
>>>>>>> form of a DataFrame. Continuously flowing data from the stream
keeps
>>>>>>> getting added to this DataFrame (which is the unbounded table)
which
>>>>>>> warrants a change to the DataFrame which violates the vary basic
nature of
>>>>>>> a DataFrame since a DataFrame by its nature is immutable. This
sounds
>>>>>>> contradictory. Is there an explanation for this?
>>>>>>>
>>>>>>> Regards
>>>>>>> Sheel
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Sheel Pancholi
>>>>>
>>>>> *Mob: +91 9620474620*
>>>>>
>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> |
>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>
>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> |
>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>> <sheelstera@live.com>*
>>>>>
>>>>> P *Save a tree* - please do not print this email unless you really
>>>>> need to!
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best Regards,
>>>>
>>>> Sheel Pancholi
>>>>
>>>> *Mob: +91 9620474620*
>>>>
>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> |
>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>
>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>>>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>>>
>>>> P *Save a tree* - please do not print this email unless you really
>>>> need to!
>>>>
>>>
>>
>> --
>>
>> Best Regards,
>>
>> Sheel Pancholi
>>
>> *Mob: +91 9620474620*
>>
>> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
>> <http://in.linkedin.com/in/sheelstera>*
>>
>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>
>> P *Save a tree* - please do not print this email unless you really need
>> to!
>>
>

Mime
View raw message