spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Leverentz <>
Subject Custom aggregations: modular and lightweight solutions?
Date Tue, 13 Aug 2019 00:31:34 GMT
Hi All,

I'm attempting to clean up some Spark code which performs groupByKey /
mapGroups to compute custom aggregations, and I could use some help
understanding the Spark API's necessary to make my code more modular and

In particular, my current approach is as follows:

   - Start with a Dataset[CaseClass1]
   - Apply groupByKey(f), where f is a function that extracts a tuple of
   - Apply mapGroups(g), where g computes multiple custom aggregations:
      - Iterate through the rows in each group, updating a large, mutable
      CustomState object.
      - At the end of each group, transform the current key and the
      CustomState into an instance of CaseClass2.

In other words, we start with a dataset of CaseClass1 objects and end up
with a dataset of CaseClass2 objects, using instances of a complex
CustomState class to store the intermediate state during the aggregation.

We have dozens of custom aggregation calculations to perform on this data,
and I'd like to be able streamline the process of introducing new
aggregations and comparing multiple parameterized variations of the same
aggregations side-by-side.  The current approach requires us to touch
several tightly coupled pieces of code in order to add simple variations to
existing aggregate functions.

The UDAF API seems to be designed for this use case, but I've found it to
be just as cumbersome to create new UDAF's as it is to maintain my current

To address this, I've tried a couple of approaches (described below),
although I've run into problems with both of them.

At a high level, both of my approaches require a Dataset[T], a key
extractor function (T => K), and a collection of instances of a custom
class GroupingCalculation[T, S, R].  Here, T is the data type of each row
in the dataset, K is the type of the key by which the rows should be
grouped, S is the type of the intermediate state during aggregation, and R
is the result type of each aggregation.  In this context, the data types T
and K are fixed, but the state and result types (S and R) may vary among
the GroupingCalculation instances.  The resulting DataFrame will have Rows
which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ...,
Rn are the result types for the GroupingCollection instances.

(1) My first approach operates by constructing a
UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to
T, S, and R.  After some digging and experimentation, I found a way to use
CatalystTypeConverters and ExpressionEncoders to populate the
MutableAggregationBuffer.  Unfortunately, once I finally got it running,
this approach yielded a runtime 10x slower than the original approach
described above. I suspect that adding an extra encoding/decoding layer on
top of the UDAF was what slowed it down.  Because of this, I'm setting
aside this approach for now.

(2) Using a similar API to (1), I replaced the implementation with one that
uses groupByKey and mapGroups.  This bypasses the need for creating a
wrapper around UDAF.  Also, the internal state, rather than being encoded
in a DataFrame, is simply stored in one mutable ArrayBuffer[Any] per
group.  An implementation of this approach is available here:
I feel that this implementation is promising, but I haven't been able to
get some of my test cases in the above Gist to pass.  In particular, my
test cases "Test grouping calculations with various combinations of case
classes" and "Test firstAndOnly" fail with the following runtime error
messages, respectively:

   - "TestCase3 is not a valid external type for schema of
   - "scala.Some is not a valid external type for schema of string"

Would anyone be able to help me diagnose the runtime errors with approach
(2), or to suggest a better alternative?

~ Andrew

View raw message