spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Spitzer <>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 14:24:43 GMT
The plan may be several RDDS and the State itself will be stored
separately, you can always check this by looking at the UI when running
your job. The Stage Tab will show you exactly what RDDs are created. The
exact source RDD (or rdds created) is determined by the Relation
implementation of the datasource being read. In the case of the socket
reader, it will only keep those entries for the batch window being
processed. So you will get 10 records for the first batch, and 5 for the
second. We can see all of this in the UI.

I recommend actually booting up spark and doing this, here is a run I just
did in the spark shell in local mode.

Each execution as the job runs is translated into a job with stages

[image: image.png]

Inside our stage we can see the RDD's Created

[image: image.png]

The DataSource RDD represents the data being added to the groupBy

Thus data is combined with our state store in the second stage

[image: image.png]

As we continue typing more data into netcat, the shuffle write (amount of
data passed into the grouping aggregation)

[image: image.png]

Is only equal to the amount of rows read in that batch. Notice how it is
empty when there are no words typed, and more bytes when there are more
words typed and less when there are less word.

I encourage you to actually load this up and play around with the UI to get
a feel for it.

On Thu, May 16, 2019 at 8:46 AM Sheel Pancholi <> wrote:

> Hello Russell,
> Thank you so much again. That answers almost everything.
> I was fully aware of the example I was quoting. My interpretation of
> "unbounded" as I understood from the docs was: *"the DF for the stream
> acts an unconditionally unbounded table"* i.e. regardless of the type of
> operation (a transformation whether its a *map()* or a *reducebyKey())* or
> output mode intended upon the stream, the data/stream would continue to be
> accumulated like a transaction table. With that interpretation in mind,
> there was a confusion with the statements where one statement said *~"the
> stream is treated like an unbounded table"* and on the other hand it said *"source
> data is discarded upon processing and only keeps around minimal
> intermediate data". *
> With your crisp explanation + the following diagram of the Catalyst
> Optimizer ...
> [image: image.png]
> ...can I infer the following *using the example of word count from the
> Spark Docs*?:
> 1. If at time instant T0, 10 records arrive in the stream, then the input
> table is of size 10 and correspondingly, the code generation phase will
> ultimately create a base RDD for 10 elements for the DF *lines* and those
> will be processed
> 2. At time instant T1, the stream gets another 5 records, and,
> subsequently, because of the two important features: *complete mode* and *groupBy()
> operation, *the complete state is preserved and therefore the input table
> is of size 15 and therefore, during the code generation phase, *a new
> base RDD for 15 elements* is generated for the DF *lines*.
> Please confirm.
> Regards
> Sheel
> On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <>
> wrote:
>> You are looking at the digram without looking at the underlying request.
>> The behavior of state collection is dependent on the request and the output
>> mode of the query.
>> In the example you cite
>> val lines = spark.readStream
>>   .format("socket")
>>   .option("host", "localhost")
>>   .option("port", 9999)
>>   .load()
>> // Split the lines into wordsval words =[String].flatMap(_.split(" "))
>> // Generate running word countval wordCounts = words.groupBy("value").count()
>> // Start running the query that prints the running counts to the consoleval query
= wordCounts.writeStream
>>   .outputMode("complete")
>>   .format("console")
>>   .start()
>> query.awaitTermination()
>> This code contains two important features which cause the whole state to
>> be held. First we have an aggregation. The groupBy operation is un bounded.
>> This means an unlimited amount of state will be required to be stored as
>> the application proceeds. In addition the output mode is "complete". This
>> means on every batch iteration the entire set of output results must be
>> returned.
>> Digram is just showing which records are processed by the request at each
>> time, along with the previous data that was required to build the state.
>> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <>
>> wrote:
>>> Hello,
>>> Along with what I sent before, I want to add that I went over the
>>> documentation at
>>> Here is an excerpt:
>>> [image: Model]
>>>> <>
>>>> Note that Structured Streaming does not materialize the entire table.
>>>> It reads the latest available data from the streaming data source,
>>>> processes it incrementally to update the result, and then discards the
>>>> source data. It only keeps around the minimal intermediate *state* data
>>>> as required to update the result (e.g. intermediate counts in the earlier
>>>> example).
>>> My question is: on one hand, the diagram shows the input table to truly
>>> be unbounded by constantly letting the data arrive into this "table". But,
>>> on the other hand, it also says, that it discards the source data. Then,
>>> what is the meaning of the unbounded table in the diagram above showing
>>> incremental data arriving and sitting in this unbounded input table!
>>> Moreover, it also says that it keeps the intermediate data only (i.e. the
>>> intermediate counts). This is kind of sounding contradictory in my head.
>>> Could you please clarify what is it ultimately supposed to be?
>>> Regards
>>> Sheel
>>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <>
>>> wrote:
>>>> Hello Russell,
>>>> Thanks for clarifying. I went over the Catalyst Optimizer Deep Dive
>>>> video at and that along
>>>> with your explanation made me realize that the the DataFrame is the new
>>>> DStream in Structured Streaming. If my understanding is correct, request
>>>> you to clarify the 2 points below:
>>>> 1. Incremental Query - Say at time instant T1 you have 10 items to
>>>> process, and then at time instant T2 you have 5 newer items streaming in
>>>> be processed. *Structured Streaming* says that the DF is treated as an
>>>> unbounded table and hence 15 items will be processed together. Does this
>>>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst Optimizer in
>>>> the code generation phase creates an RDD of 10 elements and on Iteration
>>>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD of 15
>>>> elements?
>>>> 2. You mentioned *"Some parts of the plan refer to static pieces of
>>>> data  ..."*  Could you elaborate a bit more on what does this static
>>>> piece of data refer to? Are you referring to the 10 records that had
>>>> already arrived at T1 and are now sitting as old static data in the
>>>> unbounded table?
>>>> Regards
>>>> Sheel
>>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>>>> wrote:
>>>>> Dataframes describe the calculation to be done, but the underlying
>>>>> implementation is an "Incremental Query". That is that the dataframe
>>>>> is executed repeatedly with Catalyst adjusting the final execution plan
>>>>> each run. Some parts of the plan refer to static pieces of data, others
>>>>> refer to data which is pulled in on each iteration. None of this changes
>>>>> the DataFrame objects themselves.
>>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <>
>>>>> wrote:
>>>>>> Hi
>>>>>> Structured Streaming treats a stream as an unbounded table in the
>>>>>> form of a DataFrame. Continuously flowing data from the stream keeps
>>>>>> getting added to this DataFrame (which is the unbounded table) which
>>>>>> warrants a change to the DataFrame which violates the vary basic
nature of
>>>>>> a DataFrame since a DataFrame by its nature is immutable. This sounds
>>>>>> contradictory. Is there an explanation for this?
>>>>>> Regards
>>>>>> Sheel
>>>> --
>>>> Best Regards,
>>>> Sheel Pancholi
>>>> *Mob: +91 9620474620*
>>>> *Connect with me on: Twitter <> |
>>>> LinkedIn <>*
>>>> *Write to me at:Sheel@Yahoo!! <> | Sheel@Gmail!!
>>>> <> | Sheel@Windows Live <>*
>>>> P *Save a tree* - please do not print this email unless you really
>>>> need to!
>>> --
>>> Best Regards,
>>> Sheel Pancholi
>>> *Mob: +91 9620474620*
>>> *Connect with me on: Twitter <> | LinkedIn
>>> <>*
>>> *Write to me at:Sheel@Yahoo!! <> | Sheel@Gmail!!
>>> <> | Sheel@Windows Live <>*
>>> P *Save a tree* - please do not print this email unless you really need
>>> to!
> --
> Best Regards,
> Sheel Pancholi
> *Mob: +91 9620474620*
> *Connect with me on: Twitter <> | LinkedIn
> <>*
> *Write to me at:Sheel@Yahoo!! <> | Sheel@Gmail!!
> <> | Sheel@Windows Live <>*
> P *Save a tree* - please do not print this email unless you really need
> to!

View raw message