spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Burak Yavuz <brk...@gmail.com>
Subject Re: [SS] Why is a streaming aggregation required for complete output mode?
Date Fri, 18 Aug 2017 22:10:56 GMT
Hi Jacek,

The way the memory sink is architected at the moment is that it either
appends a row (append/update mode) or replaces all rows (complete mode).
When a user specifies a checkpoint location, the guarantee Structured
Streaming provides is that output sinks will not lose data and will be able
to serve the results according to the specified output mode.

Now, what makes Complete mode so special for the memory sink? With
aggregations, and complete mode, all the results are provided from the
StateStores, therefore we can accept a checkpoint location (where the
StateStores save the data), and we can recreate the memory table at each
trigger.

Why doesn't append mode work for a Memory Sink? The memory sink keeps data
from previous triggers in-memory. It doesn't persist it anywhere. If you
were to query the table after restarting your stream, all the data would've
been lost, and in order to retrieve the existing state of the memory table,
you would need to process all the data that from scratch.

Does all this make sense? Happy to elaborate.

Best,
Burak





On Fri, Aug 18, 2017 at 12:52 PM, Jacek Laskowski <jacek@japila.pl> wrote:

> Hi,
>
> This is what I could find in Spark's source code about the
> `recoverFromCheckpointLocation` flag (that led me to explore the
> complete output mode for dropDuplicates operator).
>
> `recoverFromCheckpointLocation` flag is enabled by default and varies
> per sink (memory, console and others).
>
> * `memory` sink has the flag enabled for Complete output mode only
>
> * `foreach` sink has the flag always enabled
>
> * `console` sink has the flag always disabled
>
> * all other sinks have the flag always enabled
>
> As agreed with Michael
> (https://issues.apache.org/jira/browse/SPARK-21667) is to make console
> sink accepting the flag as enabled which would make memory sink the
> only one left with the flag enabled for Complete output.
>
> And I thought I've been close to understand Structured Streaming :)
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Aug 18, 2017 at 9:21 PM, Holden Karau <holden@pigscanfly.ca>
> wrote:
> > My assumption is it would be similar though, in memory sink of all of
> your
> > records would quickly overwhelm your cluster, but in aggregation it
> could be
> > reasonable. But there might be additional reasons on top of that.
> >
> > On Fri, Aug 18, 2017 at 11:44 AM Holden Karau <holden@pigscanfly.ca>
> wrote:
> >>
> >> Ah yes I'm not sure about the workings of the memory sink.
> >>
> >> On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski <jacek@japila.pl>
> wrote:
> >>>
> >>> Hi Holden,
> >>>
> >>> Thanks a lot for a bit more light on the topic. That however does not
> >>> explain why memory sink requires Complete for a checkpoint location to
> >>> work. The only reason I used Complete output mode was to meet the
> >>> requirements of memory sink and that got me thinking why would the
> >>> already-memory-hungry memory sink require yet another thing to get the
> >>> query working.
> >>>
> >>> On to exploring the bits...
> >>>
> >>> Pozdrawiam,
> >>> Jacek Laskowski
> >>> ----
> >>> https://medium.com/@jaceklaskowski/
> >>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> >>> Follow me at https://twitter.com/jaceklaskowski
> >>>
> >>>
> >>> On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <holden@pigscanfly.ca>
> >>> wrote:
> >>> > So performing complete output without an aggregation would require
> >>> > building
> >>> > up a table of the entire input to write out at each micro batch. This
> >>> > would
> >>> > get prohibitively expensive quickly. With an aggregation we just need
> >>> > to
> >>> > keep track of the aggregates and update them every batch, so the
> memory
> >>> > requirement is more reasonable.
> >>> >
> >>> > (Note: I don't do a lot of work in streaming so there may be
> additional
> >>> > reasons, but these are the ones I remember from when I was working
on
> >>> > looking at integrating ML with SS).
> >>> >
> >>> > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <jacek@japila.pl>
> >>> > wrote:
> >>> >>
> >>> >> Hi,
> >>> >>
> >>> >> Why is the requirement for a streaming aggregation in a streaming
> >>> >> query? What would happen if Spark allowed Complete without a single
> >>> >> aggregation? This is the latest master.
> >>> >>
> >>> >> scala> val q = ids.
> >>> >>      |   writeStream.
> >>> >>      |   format("memory").
> >>> >>      |   queryName("dups").
> >>> >>      |   outputMode(OutputMode.Complete).  // <-- memory sink
> supports
> >>> >> checkpointing for Complete output mode only
> >>> >>      |   trigger(Trigger.ProcessingTime(30.seconds)).
> >>> >>      |   option("checkpointLocation", "checkpoint-dir"). // <--
use
> >>> >> checkpointing to save state between restarts
> >>> >>      |   start
> >>> >> org.apache.spark.sql.AnalysisException: Complete output mode not
> >>> >> supported when there are no streaming aggregations on streaming
> >>> >> DataFrames/Datasets;;
> >>> >> Project [cast(time#10 as bigint) AS time#15L, id#6]
> >>> >> +- Deduplicate [id#6], true
> >>> >>    +- Project [cast(time#5 as timestamp) AS time#10, id#6]
> >>> >>       +- Project [_1#2 AS time#5, _2#3 AS id#6]
> >>> >>          +- StreamingExecutionRelation MemoryStream[_1#2,_2#3],
> [_1#2,
> >>> >> _2#3]
> >>> >>
> >>> >>   at
> >>> >>
> >>> >> org.apache.spark.sql.catalyst.analysis.
> UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$
> UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.
> scala:297)
> >>> >>   at
> >>> >>
> >>> >> org.apache.spark.sql.catalyst.analysis.
> UnsupportedOperationChecker$.checkForStreaming(
> UnsupportedOperationChecker.scala:115)
> >>> >>   at
> >>> >>
> >>> >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(
> StreamingQueryManager.scala:232)
> >>> >>   at
> >>> >>
> >>> >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(
> StreamingQueryManager.scala:278)
> >>> >>   at
> >>> >>
> >>> >> org.apache.spark.sql.streaming.DataStreamWriter.
> start(DataStreamWriter.scala:249)
> >>> >>   ... 57 elided
> >>> >>
> >>> >> Pozdrawiam,
> >>> >> Jacek Laskowski
> >>> >> ----
> >>> >> https://medium.com/@jaceklaskowski/
> >>> >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> >>> >> Follow me at https://twitter.com/jaceklaskowski
> >>> >>
> >>> >> ------------------------------------------------------------
> ---------
> >>> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>> >>
> >>> > --
> >>> > Cell : 425-233-8271
> >>> > Twitter: https://twitter.com/holdenkarau
> >>
> >>
> >>
> >>
> >> --
> >> Cell : 425-233-8271
> >> Twitter: https://twitter.com/holdenkarau
> >
> > --
> > Cell : 425-233-8271
> > Twitter: https://twitter.com/holdenkarau
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Mime
View raw message