spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Spitzer <russell.spit...@gmail.com>
Subject Re: Are Spark Dataframes mutable in Structured Streaming?
Date Thu, 16 May 2019 15:51:21 GMT
Kafka only pulls from the last offset to the the most recent (or throttled
next point), so it would also only be pulling in a unique set of data each
batch. In case of failure, spark just re reads the batches offset interval
from kafka.

On Thu, May 16, 2019 at 10:45 AM Stephen Boesch <javadba@gmail.com> wrote:

> > The exact source RDD (or rdds created) is determined by the Relation
> implementation of the datasource being read
>
> That is something I completely missed. In any case have been working
> exclusively with Kafka - which I believe (but will confirm using your
> suggestions to look at the GUI) do support retention across all batches.
> There are a lot of moving parts/variations of behavior in the SSS.
>
> Am Do., 16. Mai 2019 um 07:25 Uhr schrieb Russell Spitzer <
> russell.spitzer@gmail.com>:
>
>> The plan may be several RDDS and the State itself will be stored
>> separately, you can always check this by looking at the UI when running
>> your job. The Stage Tab will show you exactly what RDDs are created. The
>> exact source RDD (or rdds created) is determined by the Relation
>> implementation of the datasource being read. In the case of the socket
>> reader, it will only keep those entries for the batch window being
>> processed. So you will get 10 records for the first batch, and 5 for the
>> second. We can see all of this in the UI.
>>
>> I recommend actually booting up spark and doing this, here is a run I
>> just did in the spark shell in local mode.
>>
>> Each execution as the job runs is translated into a job with stages
>>
>>
>> [image: image.png]
>>
>>
>> Inside our stage we can see the RDD's Created
>>
>> [image: image.png]
>>
>> The DataSource RDD represents the data being added to the groupBy
>>
>> Thus data is combined with our state store in the second stage
>>
>> [image: image.png]
>>
>> As we continue typing more data into netcat, the shuffle write (amount of
>> data passed into the grouping aggregation)
>>
>> [image: image.png]
>>
>> Is only equal to the amount of rows read in that batch. Notice how it is
>> empty when there are no words typed, and more bytes when there are more
>> words typed and less when there are less word.
>>
>> I encourage you to actually load this up and play around with the UI to
>> get a feel for it.
>>
>> On Thu, May 16, 2019 at 8:46 AM Sheel Pancholi <sheelstera@gmail.com>
>> wrote:
>>
>>> Hello Russell,
>>>
>>> Thank you so much again. That answers almost everything.
>>>
>>> I was fully aware of the example I was quoting. My interpretation of
>>> "unbounded" as I understood from the docs was: *"the DF for the stream
>>> acts an unconditionally unbounded table"* i.e. regardless of the type
>>> of operation (a transformation whether its a *map()* or a
>>> *reducebyKey())* or output mode intended upon the stream, the
>>> data/stream would continue to be accumulated like a transaction table. With
>>> that interpretation in mind, there was a confusion with the statements
>>> where one statement said *~"the stream is treated like an unbounded
>>> table"* and on the other hand it said *"source data is discarded upon
>>> processing and only keeps around minimal intermediate data". *
>>>
>>> With your crisp explanation + the following diagram of the Catalyst
>>> Optimizer ...
>>> [image: image.png]
>>>
>>> ...can I infer the following *using the example of word count from the
>>> Spark Docs*?:
>>> 1. If at time instant T0, 10 records arrive in the stream, then the
>>> input table is of size 10 and correspondingly, the code generation phase
>>> will ultimately create a base RDD for 10 elements for the DF *lines*
>>> and those will be processed
>>> 2. At time instant T1, the stream gets another 5 records, and,
>>> subsequently, because of the two important features: *complete mode*
>>> and *groupBy() operation, *the complete state is preserved and
>>> therefore the input table is of size 15 and therefore, during the code
>>> generation phase, *a new base RDD for 15 elements* is generated for the
>>> DF *lines*.
>>>
>>> Please confirm.
>>>
>>> Regards
>>> Sheel
>>>
>>>
>>>
>>> On Thu, May 16, 2019 at 6:33 PM Russell Spitzer <
>>> russell.spitzer@gmail.com> wrote:
>>>
>>>>
>>>> You are looking at the digram without looking at the underlying
>>>> request. The behavior of state collection is dependent on the request and
>>>> the output mode of the query.
>>>>
>>>> In the example you cite
>>>>
>>>> val lines = spark.readStream
>>>>   .format("socket")
>>>>   .option("host", "localhost")
>>>>   .option("port", 9999)
>>>>   .load()
>>>> // Split the lines into wordsval words = lines.as[String].flatMap(_.split("
"))
>>>> // Generate running word countval wordCounts = words.groupBy("value").count()
>>>>
>>>> // Start running the query that prints the running counts to the consoleval
query = wordCounts.writeStream
>>>>   .outputMode("complete")
>>>>   .format("console")
>>>>   .start()
>>>> query.awaitTermination()
>>>>
>>>>
>>>> This code contains two important features which cause the whole state
>>>> to be held. First we have an aggregation. The groupBy operation is un
>>>> bounded. This means an unlimited amount of state will be required to be
>>>> stored as the application proceeds. In addition the output mode is
>>>> "complete". This means on every batch iteration the entire set of output
>>>> results must be returned.
>>>>
>>>> Digram is just showing which records are processed by the request at
>>>> each time, along with the previous data that was required to build the
>>>> state.
>>>>
>>>> On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi <sheelstera@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Along with what I sent before, I want to add that I went over the
>>>>> documentation at
>>>>> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>>>>>
>>>>>
>>>>> Here is an excerpt:
>>>>>
>>>>> [image: Model]
>>>>>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>>>>>
>>>>>> Note that Structured Streaming does not materialize the entire table.
>>>>>> It reads the latest available data from the streaming data source,
>>>>>> processes it incrementally to update the result, and then discards
the
>>>>>> source data. It only keeps around the minimal intermediate *state*
data
>>>>>> as required to update the result (e.g. intermediate counts in the
earlier
>>>>>> example).
>>>>>>
>>>>> My question is: on one hand, the diagram shows the input table to
>>>>> truly be unbounded by constantly letting the data arrive into this "table".
>>>>> But, on the other hand, it also says, that it discards the source data.
>>>>> Then, what is the meaning of the unbounded table in the diagram above
>>>>> showing incremental data arriving and sitting in this unbounded input
>>>>> table!  Moreover, it also says that it keeps the intermediate data only
>>>>> (i.e. the intermediate counts). This is kind of sounding contradictory
in
>>>>> my head.
>>>>>
>>>>> Could you please clarify what is it ultimately supposed to be?
>>>>>
>>>>> Regards
>>>>> Sheel
>>>>>
>>>>> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi <sheelstera@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Russell,
>>>>>>
>>>>>> Thanks for clarifying. I went over the Catalyst Optimizer Deep Dive
>>>>>> video at https://www.youtube.com/watch?v=RmUn5vHlevc and that along
>>>>>> with your explanation made me realize that the the DataFrame is the
new
>>>>>> DStream in Structured Streaming. If my understanding is correct,
request
>>>>>> you to clarify the 2 points below:
>>>>>>
>>>>>> 1. Incremental Query - Say at time instant T1 you have 10 items to
>>>>>> process, and then at time instant T2 you have 5 newer items streaming
in to
>>>>>> be processed. *Structured Streaming* says that the DF is treated
as
>>>>>> an unbounded table and hence 15 items will be processed together.
Does this
>>>>>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst Optimizer
in
>>>>>> the code generation phase creates an RDD of 10 elements and on Iteration
2
>>>>>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD of
15
>>>>>> elements?
>>>>>> 2. You mentioned *"Some parts of the plan refer to static pieces
of
>>>>>> data  ..."*  Could you elaborate a bit more on what does this static
>>>>>> piece of data refer to? Are you referring to the 10 records that
had
>>>>>> already arrived at T1 and are now sitting as old static data in the
>>>>>> unbounded table?
>>>>>>
>>>>>> Regards
>>>>>> Sheel
>>>>>>
>>>>>>
>>>>>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>
>>>>>>> Dataframes describe the calculation to be done, but the underlying
>>>>>>> implementation is an "Incremental Query". That is that the dataframe
code
>>>>>>> is executed repeatedly with Catalyst adjusting the final execution
plan on
>>>>>>> each run. Some parts of the plan refer to static pieces of data,
others
>>>>>>> refer to data which is pulled in on each iteration. None of this
changes
>>>>>>> the DataFrame objects themselves.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi <sheelstera@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>> Structured Streaming treats a stream as an unbounded table
in the
>>>>>>>> form of a DataFrame. Continuously flowing data from the stream
keeps
>>>>>>>> getting added to this DataFrame (which is the unbounded table)
which
>>>>>>>> warrants a change to the DataFrame which violates the vary
basic nature of
>>>>>>>> a DataFrame since a DataFrame by its nature is immutable.
This sounds
>>>>>>>> contradictory. Is there an explanation for this?
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> Sheel
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Sheel Pancholi
>>>>>>
>>>>>> *Mob: +91 9620474620*
>>>>>>
>>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera>
|
>>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>>
>>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> |
>>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>>> <sheelstera@live.com>*
>>>>>>
>>>>>> P *Save a tree* - please do not print this email unless you really
>>>>>> need to!
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Sheel Pancholi
>>>>>
>>>>> *Mob: +91 9620474620*
>>>>>
>>>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> |
>>>>> LinkedIn <http://in.linkedin.com/in/sheelstera>*
>>>>>
>>>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> |
>>>>> Sheel@Gmail!! <sheelstera@gmail.com> | Sheel@Windows Live
>>>>> <sheelstera@live.com>*
>>>>>
>>>>> P *Save a tree* - please do not print this email unless you really
>>>>> need to!
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best Regards,
>>>
>>> Sheel Pancholi
>>>
>>> *Mob: +91 9620474620*
>>>
>>> *Connect with me on: Twitter <https://twitter.com/sheelstera> | LinkedIn
>>> <http://in.linkedin.com/in/sheelstera>*
>>>
>>> *Write to me at:Sheel@Yahoo!! <sheelpancholi@yahoo.com> | Sheel@Gmail!!
>>> <sheelstera@gmail.com> | Sheel@Windows Live <sheelstera@live.com>*
>>>
>>> P *Save a tree* - please do not print this email unless you really need
>>> to!
>>>
>>

Mime
View raw message