spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Enrico Minack <m...@Enrico.Minack.dev>
Subject Re: Observable Metrics on Spark Datasets
Date Fri, 19 Mar 2021 09:55:09 GMT
I'll sketch out a PR so we can talk code and move the discussion there.



Am 18.03.21 um 14:55 schrieb Wenchen Fan:
> I think a listener-based API makes sense for streaming (since you need 
> to keep watching the result), but may not be very reasonable for batch 
> queries (you only get the result once). The idea of Observation looks 
> good, but we should define what happens if `observation.get` is called 
> before the batch query finishes.
>
> Can we have a PR for it so that we can have more detailed discussions?
>
> On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim 
> <kabhwan.opensource@gmail.com <mailto:kabhwan.opensource@gmail.com>> 
> wrote:
>
>     Please follow up the discussion in the origin PR.
>     https://github.com/apache/spark/pull/26127
>     <https://github.com/apache/spark/pull/26127>
>
>     Dataset.observe() relies on the query listener for the batch query
>     which is an "unstable" API - that's why we decided to not add an
>     example for the batch query. For streaming query, it relies on the
>     streaming query listener which is a stable API. That said,
>     personally I'd consider the new API to be fit to the streaming
>     query first, and see whether it fits to the batch query as well.
>
>     If we found Dataset.observe() to be useful enough on the batch
>     query, we'd probably be better to discuss how to provide these
>     metrics against a stable API (so that Scala users could leverage
>     it), and look back later for PySpark. That looks to be the first
>     one to do if we have a consensus on the usefulness of observable
>     metrics on batch query.
>
>
>     On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack
>     <mail@enrico.minack.dev <mailto:mail@enrico.minack.dev>> wrote:
>
>         I am focusing on batch mode, not streaming mode. I would argue
>         that Dataset.observe() is equally useful for large batch
>         processing. If you need some motivating use cases, please let
>         me know.
>
>         Anyhow, the documentation of observe states it works for both,
>         batch and streaming. And in batch mode, the helper class
>         Observation helps reducing code and avoiding repetition.
>
>         The PySpark implementation of the Observation class can
>         implement *all* methods by merely calling into their JVM
>         counterpart, where the locking, listening, registration and
>         un-registration happens. I think this qualifies as: "all the
>         logic happens in the JVM". All that is transferred to Python
>         is a row's data. No listeners needed.
>
>         Enrico
>
>
>
>         Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
>>         If I remember correctly, the major audience of the "observe"
>>         API is Structured Streaming, micro-batch mode. From the
>>         example, the abstraction in 2 isn't something working with
>>         Structured Streaming. It could be still done with callback,
>>         but it remains the question how much complexity is hidden
>>         from abstraction.
>>
>>         I see you're focusing on PySpark - I'm not sure whether
>>         there's intention on not exposing query listener / streaming
>>         query listener to PySpark, but if there's some valid reason
>>         to do so, I'm not sure we do like to expose them to PySpark
>>         in any way. 2 isn't making sense to me with PySpark - how do
>>         you ensure all the logic is happening in the JVM and you can
>>         leverage these values from PySpark?
>>         (I see there's support for listeners with DStream in PySpark,
>>         so there might be reasons not to add the same for SQL/SS.
>>         Probably a lesson learned?)
>>
>>
>>         On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack
>>         <mail@enrico.minack.dev <mailto:mail@enrico.minack.dev>> wrote:
>>
>>             Hi Spark-Devs,
>>
>>             the observable metrics that have been added to the
>>             Dataset API in 3.0.0 are a great improvement over the
>>             Accumulator APIs that seem to have much weaker
>>             guarantees. I have two questions regarding follow-up
>>             contributions:
>>
>>             *1. Add observe to Python ***DataFrame**
>>
>>             As I can see from master branch, there is no equivalent
>>             in the Python API. Is this something planned to happen,
>>             or is it missing because there are not listeners in
>>             PySpark which renders this method useless in Python. I
>>             would be happy to contribute here.
>>
>>             *2. Add Observation class to simplify result access
>>             *
>>
>>             The Dataset.observe method requires users to register
>>             listeners
>>             <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
>>             with QueryExecutionListener or StreamingQUeryListener to
>>             obtain the result. I think for simple setups, this could
>>             be hidden behind a common helper class here called
>>             Observation, which reduces the usage of observe to three
>>             lines of code:
>>
>>             // our Dataset (this does not count as a line of code) val df =Seq((1,
"a"), (2, "b"), (4, "c"), (8, "d")).toDF("id", "value")
>>
>>             // define the observation we want to make val observation =Observation("stats",
count($"id"), sum($"id"))
>>
>>             // add the observation to the Dataset and execute an
>>             action on it val cnt = df.observe(observation).count()
>>
>>             // retrieve the result assert(observation.get ===Row(4, 15))
>>
>>             The Observation class can handle the registration and
>>             de-registration of the listener, as well as properly
>>             accessing the result across thread boundaries.
>>
>>             With *2.*, the observe method can be added to PySpark
>>             without introducing listeners there at all. All the logic
>>             is happening in the JVM.
>>
>>             Thanks for your thoughts on this.
>>
>>             Regards,
>>             Enrico
>>

Mime
View raw message