spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "HENSLEE, AUSTIN L" <ah6...@att.com>
Subject Re: top-k function for Window
Date Tue, 03 Jan 2017 18:59:54 GMT
Andy,

You might want to also checkout the Algebird libraries from Twitter. They have topK and a
lot of other helpful functions. I’ve used the Algebird topk successfully on very large data
sets.

You can also use Spark SQL to do a “poor man’s” topK. This depends on how scrupulous
you are about your TopKs (I can expound on this, if needed).

I obfuscated the field names, before pasting this into email – I think I got them all consistently.

Here’s the meat of the TopK part (found on SO, but I don’t have a reference) – this
one takes the top 4, hence “rowNum <= 4”:

SELECT time_bucket,
       identifier1,
       identifier2,
       incomingCount
  FROM (select time_bucket,
        identifier1,
        identifier2,
        incomingCount,
       ROW_NUMBER() OVER (PARTITION BY time_bucket,
                                       identifier1
                              ORDER BY count DESC) as rowNum
                                  FROM tablename) tmp
  WHERE rowNum <=4
  ORDER BY time_bucket, identifier1, rowNum

The count and order by:


SELECT time_bucket,
       identifier1,
       identifier2,
       count(identifier2) as myCount
  FROM table
  GROUP BY time_bucket,
           identifier1,
           identifier2
  ORDER BY time_bucket,
           identifier1,
           count(identifier2) DESC


From: Andy Dang <namd88@gmail.com>
Date: Tuesday, January 3, 2017 at 7:06 AM
To: user <user@spark.apache.org>
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data locally, get
the local rank first. What it ends up doing is performing a sort by key using the skewed keys,
and this blew up the cluster since the keys are heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, especially with Window
function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way
that I missed.

-------
Regards,
Andy
Mime
View raw message