spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hyukjin Kwon (Jira)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-30089) count over Window function with orderBy gives wrong results
Date Mon, 02 Dec 2019 02:13:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-30089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985752#comment-16985752
] 

Hyukjin Kwon commented on SPARK-30089:
--------------------------------------

The results are expected because default bounds are unbounded preceding and current row. What
you're expecting is using `unboundedFollowing` but `row_number` requires current row bound.

You should do something like this as below:

{code}
import org.apache.spark.sql.expressions.Window

val df = (spark
  .range(20)
  .drop("id")
  .withColumn("event_id", (rand() * 5).cast("int"))
  .withColumn("secondary_key", rand())
  .withColumn("time", (rand() * 2).cast("int")))


val row_spec = Window.partitionBy("time").orderBy("event_id", "secondary_key").rowsBetween(Window.unboundedPreceding,
Window.currentRow)
val spec = Window.partitionBy("time").orderBy("event_id", "secondary_key").rowsBetween(Window.unboundedPreceding,
Window.unboundedFollowing)
(df
  .withColumn("event_index", row_number().over(row_spec))
  .withColumn("events_in_time", count("event_id").over(spec))).show(false)
{code}

> count over Window function with orderBy gives wrong results
> -----------------------------------------------------------
>
>                 Key: SPARK-30089
>                 URL: https://issues.apache.org/jira/browse/SPARK-30089
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Shay Elbaz
>            Priority: Major
>
> Please consider the following data, where *event_id* has 5 non unique values, and *time*
is some boolean value:
> {code:java}
> val df = spark
>   .range(20)
>   .drop("id")
>   .withColumn("event_id", (rand() * 5).cast("int"))
>   .withColumn("secondary_key", rand())
>   .withColumn("time", (rand() * 2).cast("int"))
> {code}
> output:
> {noformat}
> +--------+-------------------+----+
> |event_id|secondary_key      |time|
> +--------+-------------------+----+
> |4       |0.9772771523180686 |0   |
> |2       |0.9334658337212178 |1   |
> |3       |0.19471070128057155|0   |
> |3       |0.7199139320519544 |1   |
> |0       |0.2950226274440527 |0   |
> |1       |0.26756419276811183|0   |
> |0       |0.8505002394080461 |1   |
> |2       |0.43758689359535163|1   |
> |1       |0.9328102324257992 |0   |
> |2       |0.9829272033815031 |0   |
> |3       |0.4579860738704702 |0   |
> |1       |0.9220937240097472 |1   |
> |1       |0.5145520547685413 |0   |
> |2       |0.11314407779922231|0   |
> |2       |0.42837936719991054|1   |
> |3       |0.23501843822326307|1   |
> |2       |0.20053336248248554|0   |
> |3       |0.07781415213387388|0   |
> |3       |0.633004353367962  |0   |
> |3       |0.4071317068782465 |0   |
> +--------+-------------------+----+{noformat}
>  
> Now we would like to get the event index in every *time,* and also the total rows per
*time:*
> {code:java}
> val spec = Window.partitionBy("time").orderBy("event_id", "secondary_key")
> df
>   .withColumn("event_index", row_number().over(spec))
>   .withColumn("events_in_time", count("event_id").over(spec))
>   .show(false)
> {code}
>  
> It seems that _orderBy_ has some side effect, as per this output (see events_in_time):
> {noformat}
> +--------+-------------------+----+-----------+--------------+
> |event_id|secondary_key      |time|event_index|events_in_time|
> +--------+-------------------+----+-----------+--------------+
> |0       |0.46503911208798054|1   |1          |1             |
> |1       |0.39987355658705015|1   |2          |2             |
> |1       |0.5691951918819504 |1   |3          |3             |
> |2       |0.07400147458165662|1   |4          |4             |
> |2       |0.7592681952170066 |1   |5          |5             |
> |3       |0.02912532019167091|1   |6          |6             |
> |3       |0.8055599468620407 |1   |7          |7             |
> |4       |0.2145552471806751 |1   |8          |8             |
> |4       |0.9898589033586774 |1   |9          |9             |
> |0       |0.39486528440812896|0   |1          |1             |
> |1       |0.2861869575899465 |0   |2          |2             |
> |1       |0.83560556569591   |0   |3          |3             |
> |2       |0.09764393740040855|0   |4          |4             |
> |2       |0.1372111795261538 |0   |5          |5             |
> |2       |0.18723423836738395|0   |6          |6             |
> |2       |0.5326764866419712 |0   |7          |7             |
> |3       |0.93985884066349   |0   |8          |8             |
> |3       |0.9956976178321568 |0   |9          |9             |
> |4       |0.6508676154889343 |0   |10         |10            |
> |4       |0.6664965696641834 |0   |11         |11            |
> +--------+-------------------+----+-----------+--------------+{noformat}
>  
> I expected to see 2 distinct values in events_in_time, 13 and 7 for foo=0 and foo=1 respectively.
*We do get the expected results when omitting orderBy from window spec,* which lead me to
believe there _is_ a bug in this plan.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message