spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mendelson, Assaf" <>
Subject RE: top-k function for Window
Date Wed, 04 Jan 2017 07:56:06 GMT
Assume you have a UDAF which looks like this:

-          Input: The value

-          Buffer: K elements

-          Output: An array (which would have the K elements)

-          Init: Initialize all elements to some irrelevant value (e.g. int.MinValue)

-          Update: Start going over the buffer find the spot which is smaller than the current
value then push everything forward and put it in (i.e. sorted insert)

-          Merge: “merge sort” between the two buffers

-          Evaluate: turn the buffer to array
Then run the UDAF on the groupby.

The result would be an array of (upto) K elements per key. To turn it back to K lines all
you need to do is explode it.

Assuming that K is small, the calculation of the UDAF would be much faster than the sorting
(it only needs to do sortings on very small K).

From: Andy Dang []
Sent: Tuesday, January 03, 2017 8:03 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: top-k function for Window

> Furthermore, in your example you don’t even need a window function, you can simply
use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or reduce-side sorting).


On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf <<>>
You can write a UDAF in which the buffer contains the top K and manage it. This means you
don’t need to sort at all. Furthermore, in your example you don’t even need a window function,
you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [<>]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data locally, get
the local rank first. What it ends up doing is performing a sort by key using the skewed keys,
and this blew up the cluster since the keys are heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, especially with Window
function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way
that I missed.


View raw message