spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Leverentz <>
Subject Re: Custom aggregations: modular and lightweight solutions?
Date Tue, 13 Aug 2019 19:59:34 GMT
Here's a simpler example that I think gets at the heart of what I'm trying
to do: DynamicSchemaExample.scala
<>.  Here,
I'm dynamically creating a sequence of Rows and also dynamically creating a
corresponding schema (StructType), but the RowEncoder derived from the
schema doesn't seem to handle the nested structure of the Rows.  This
example fails with a similar error (in this case, "scala.Tuple2$mcDD$sp is
not a valid external type for schema of struct<_1:double,_2:double>").

If I could find a way to get this example working (for arbitrary values of
rowSize), I suspect that it would also give me a solution to the
custom-aggregation issue I outlined in my previous email.  Any suggestions
would be much appreciated.

~ Andrew

On Mon, Aug 12, 2019 at 5:31 PM Andrew Leverentz <> wrote:

> Hi All,
> I'm attempting to clean up some Spark code which performs groupByKey /
> mapGroups to compute custom aggregations, and I could use some help
> understanding the Spark API's necessary to make my code more modular and
> maintainable.
> In particular, my current approach is as follows:
>    - Start with a Dataset[CaseClass1]
>    - Apply groupByKey(f), where f is a function that extracts a tuple of
>    keys
>    - Apply mapGroups(g), where g computes multiple custom aggregations:
>       - Iterate through the rows in each group, updating a large, mutable
>       CustomState object.
>       - At the end of each group, transform the current key and the
>       CustomState into an instance of CaseClass2.
> In other words, we start with a dataset of CaseClass1 objects and end up
> with a dataset of CaseClass2 objects, using instances of a complex
> CustomState class to store the intermediate state during the aggregation.
> We have dozens of custom aggregation calculations to perform on this data,
> and I'd like to be able streamline the process of introducing new
> aggregations and comparing multiple parameterized variations of the same
> aggregations side-by-side.  The current approach requires us to touch
> several tightly coupled pieces of code in order to add simple variations to
> existing aggregate functions.
> The UDAF API seems to be designed for this use case, but I've found it to
> be just as cumbersome to create new UDAF's as it is to maintain my current
> code.
> To address this, I've tried a couple of approaches (described below),
> although I've run into problems with both of them.
> At a high level, both of my approaches require a Dataset[T], a key
> extractor function (T => K), and a collection of instances of a custom
> class GroupingCalculation[T, S, R].  Here, T is the data type of each row
> in the dataset, K is the type of the key by which the rows should be
> grouped, S is the type of the intermediate state during aggregation, and R
> is the result type of each aggregation.  In this context, the data types T
> and K are fixed, but the state and result types (S and R) may vary among
> the GroupingCalculation instances.  The resulting DataFrame will have Rows
> which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ...,
> Rn are the result types for the GroupingCollection instances.
> (1) My first approach operates by constructing a
> UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to
> T, S, and R.  After some digging and experimentation, I found a way to use
> CatalystTypeConverters and ExpressionEncoders to populate the
> MutableAggregationBuffer.  Unfortunately, once I finally got it running,
> this approach yielded a runtime 10x slower than the original approach
> described above. I suspect that adding an extra encoding/decoding layer on
> top of the UDAF was what slowed it down.  Because of this, I'm setting
> aside this approach for now.
> (2) Using a similar API to (1), I replaced the implementation with one
> that uses groupByKey and mapGroups.  This bypasses the need for creating a
> wrapper around UDAF.  Also, the internal state, rather than being encoded
> in a DataFrame, is simply stored in one mutable ArrayBuffer[Any] per
> group.  An implementation of this approach is available here:
> I feel that this implementation is promising, but I haven't been able to
> get some of my test cases in the above Gist to pass.  In particular, my
> test cases "Test grouping calculations with various combinations of case
> classes" and "Test firstAndOnly" fail with the following runtime error
> messages, respectively:
>    - "TestCase3 is not a valid external type for schema of
>    struct<a:int,b:double>"
>    - "scala.Some is not a valid external type for schema of string"
> Would anyone be able to help me diagnose the runtime errors with approach
> (2), or to suggest a better alternative?
> Thanks,
> ~ Andrew

View raw message