spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: PySpark order-only window function issue
Date Thu, 13 Aug 2015 03:42:38 GMT
This should be a bug, go ahead to open a JIRA for it, thanks!

On Tue, Aug 11, 2015 at 6:41 AM, Maciej Szymkiewicz
<mszymkiewicz@gmail.com> wrote:
> Hello everyone,
>
> I am trying to use PySpark API with window functions without specifying
> partition clause. I mean something equivalent to this
>
> SELECT v, row_number() OVER (ORDER BY v) AS rn FROM df
>
> in SQL. I am not sure if I am doing something wrong or it is a bug but
> results are far from what I expect. Lets assume we have data as follows:
>
> from pyspark.sql.window import Window
> from pyspark.sql import functions as f
>
> df = sqlContext.createDataFrame(
>     zip(["foo"] * 5 + ["bar"] * 5, range(1, 6) + range(6, 11)),
>     ("k", "v")
> ).withColumn("dummy", f.lit(1))
>
> df.registerTempTable("df")
> df.show()
>
> +---+--+-----+
> |  k| v|dummy|
> +---+--+-----+
> |foo| 1|    1|
> |foo| 2|    1|
> |foo| 3|    1|
> |foo| 4|    1|
> |foo| 5|    1|
> |bar| 6|    1|
> |bar| 7|    1|
> |bar| 8|    1|
> |bar| 9|    1|
> |bar|10|    1|
> +---+--+-----+
>
> When I use following SQL query
>
> sql_ord = """SELECT k, v, row_number() OVER (
>         ORDER BY v
>     ) AS rn FROM df"""
>
> sqlContext.sql(sql_ord).show()
>
> I get expected results:
>
> +---+--+--+
> |  k| v|rn|
> +---+--+--+
> |foo| 1| 1|
> |foo| 2| 2|
> |foo| 3| 3|
> |foo| 4| 4|
> |foo| 5| 5|
> |bar| 6| 6|
> |bar| 7| 7|
> |bar| 8| 8|
> |bar| 9| 9|
> |bar|10|10|
> +---+--+--+
>
> but when I try to define a similar thing using Python API
>
> w_ord = Window.orderBy("v")
> df.select("k", "v", f.rowNumber().over(w_ord).alias("avg")).show()
>
> I get results like this:
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|  1|
> |foo| 2|  1|
> |foo| 3|  1|
> |foo| 4|  1|
> |foo| 5|  1|
> |bar| 6|  1|
> |bar| 7|  1|
> |bar| 8|  1|
> |bar| 9|  1|
> |bar|10|  1|
> +---+--+---+
>
> When I specify both partition on order
>
> w_part_ord = Window.partitionBy("dummy").orderBy("v")
> df.select("k", "v", f.rowNumber().over(w_part_ord).alias("avg")).show()
>
> everything works as I expect:
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|  1|
> |foo| 2|  2|
> |foo| 3|  3|
> |foo| 4|  4|
> |foo| 5|  5|
> |bar| 6|  6|
> |bar| 7|  7|
> |bar| 8|  8|
> |bar| 9|  9|
> |bar|10| 10|
> +---+--+---+
>
> Another example of similar behavior with correct SQL result:
>
> sql_ord_rng = """SELECT k, v, avg(v) OVER (
>         ORDER BY v
>         ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
>     ) AS avg FROM df"""
> sqlContext.sql(sql_ord_rng).show()
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|1.5|
> |foo| 2|2.0|
> |foo| 3|3.0|
> |foo| 4|4.0|
> |foo| 5|5.0|
> |bar| 6|6.0|
> |bar| 7|7.0|
> |bar| 8|8.0|
> |bar| 9|9.0|
> |bar|10|9.5|
> +---+--+---+
>
> and not incorrect PySpark
>
> w_ord_rng = Window.orderBy("v").rowsBetween(-1, 1)
> df.select("k", "v", f.avg("v").over(w_ord_rng).alias("avg")).show()
>
> +---+--+----+
> |  k| v| avg|
> +---+--+----+
> |foo| 1| 1.0|
> |foo| 2| 2.0|
> |foo| 3| 3.0|
> |foo| 4| 4.0|
> |foo| 5| 5.0|
> |bar| 6| 6.0|
> |bar| 7| 7.0|
> |bar| 8| 8.0|
> |bar| 9| 9.0|
> |bar|10|10.0|
> +---+--+----+
>
> Same as before adding dummy partitions solves the problem:
>
> w_part_ord_rng =
> Window.partitionBy("dummy").orderBy("v").rowsBetween(-1, 1)
> df.select("k", "v", f.avg("v").over(w_part_ord_rng).alias("avg")).show()
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|1.5|
> |foo| 2|2.0|
> |foo| 3|3.0|
> |foo| 4|4.0|
> |foo| 5|5.0|
> |bar| 6|6.0|
> |bar| 7|7.0|
> |bar| 8|8.0|
> |bar| 9|9.0|
> |bar|10|9.5|
> +---+--+---+
>
> I've checked window functions tests
> (https://github.com/apache/spark/blob/ac507a03c3371cd5404ca195ee0ba0306badfc23/python/pyspark/sql/tests.py#L1105)
> but these cover only partition + order case.
>
> Is there something wrong with my window definitions or should I open
> Jira issue?
>
> Environment:
>
> - Debian GNU/Linux
> -  Spark 1.4.1
> - Python 2.7.9
> -  OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-1~deb8u1)
>
> --
> Best,
> Maciej
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message