Trying again. Hoping to find some help in figuring out the performance bottleneck we are observing.


On Sun, Oct 30, 2016 at 11:58 AM, Spark User <> wrote:
Hi All,

I have a UDAF that seems to perform poorly when its input is skewed. I have been debugging the UDAF implementation but I don't see any code that is causing the performance to degrade. More details on the data and the experiments I have run.

DataSet: Assume 3 columns, column1 being the key.
Column1   Column2  Column3
a               1             x
a               2             x
a               3             x
a               4             x
a               5             x
a               6             z
5 million row for a
a               1000000   y
b               9             y
b               9             y
b               10           y
3 million rows for b
more rows
total rows is 100 million

a has 5 million rows.Column2 for a has 1 million unique values.
b has 3 million rows. Column2 for b has 800000 unique values.

Column 3 has just 100s of unique values not in the order of millions, for both a and b.

Say totally there are 100 million rows as the input to a UDAF aggregation. And the skew in data is for the keys a and b. All other rows can be ignored and do not cause any performance issue/ hot partitions.

The code does a dataSet.groupBy("Column1").agg(udaf("Column2", "Column3").

I commented out the UDAF implementation for update and merge methods, so essentially the UDAF was doing nothing.

With this code (empty updated and merge for UDAF) the performance for a mircro-batch is 16 minutes per micro-batch, micro-batch containing 100 million rows, with 5million rows for a and 1 million unique values for Column2 for a.

But when I pass empty values for Column2 with nothing else change, effectively reducing the 1 million unique values for Column2 to just 1 unique value, empty value. The batch processing time goes down to 4 minutes.

So I am trying to understand why is there such a big performance difference? What in UDAF causes the processing time to increase in orders of magnitude when there is a skew in the data as observed above?

Any insight from spark developers, contributors, or anyone else who has a deeper understanding of UDAF would be helpful.