spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Everett Anderson <>
Subject Re: Help taking last value in each group (aggregates)
Date Mon, 28 Aug 2017 22:05:46 GMT
I'm still unclear on if orderBy/groupBy + aggregates is a viable approach
or when one could rely on the last or first aggregate functions, but a
working alternative is to use window functions with row_number and a filter
kind of like this:

import spark.implicits._

val reverseOrdering = Seq("a", "b").map(col => df(col).desc)

val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*)"group_id",
  .filter("row_number == 1")

Would love to know if there's a better way!

On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson <> wrote:

> Hi,
> I'm struggling a little with some unintuitive behavior with the Scala API.
> (Spark 2.0.2)
> I wrote something like
> df.orderBy("a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>        last("row_id").as("last_row_id")))
> and expected a result with a unique group_id column, a column called
> "total" that's the sum of all col_to_sum in each group, and a column called
> "last_row_id" that's the last row_id seen in each group when the groups are
> sorted by columns a and b.
> However, the result is actually non-deterministic and changes based on the
> initial sorting and partitioning of df.
> I also tried
> df.orderBy("group_id", "a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>        last("row_id").as("last_row_id")))
> thinking the problem might be that the groupBy does another shuffle that
> loses the ordering, but that also doesn't seem to work.
> Looking through the code
> <>,
> both the Last and First aggregate functions have this comment:
> Even if [[Last]] is used on an already sorted column, if
> we do partial aggregation and final aggregation
> (when mergeExpression
> is used) its result will not be deterministic
> (unless the input table is sorted and has
> a single partition, and we use a single reducer to do the aggregation.).
> Some questions:
>    1. What's the best way to take some values from the last row in an
>    ordered group while performing some other aggregates over the entire group?
>    2. Given these comments on last and first, when would these functions
>    be useful? It would be rare to bring an entire Spark table to a single
>    partition.
> Thanks!

View raw message