spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 李斌松 <libinsong1...@gmail.com>
Subject spark 2.3 dataframe join bug
Date Mon, 26 Mar 2018 13:18:20 GMT
Hi, sparks:
     I'm using spark2.3 and had found a bug in spark dataframe, here is my
codes:

        sc = sparkSession.sparkContext
        tmp = sparkSession.createDataFrame(sc.parallelize([[1, 2, 3, 4],
[1, 2, 5, 6], [2, 3, 4, 5], [2, 3, 5, 6]])).toDF('a', 'b', 'c', 'd')
        tmp.createOrReplaceTempView('tdl_spark_test')
        sparkSession.sql('cache table tdl_spark_test')

        df = sparkSession.sql('select a, b from tdl_spark_test group by a,
b')
        df.printSchema()

        df1 = sparkSession.sql('select a, b, collect_set(array(c)) as c
from tdl_spark_test group by a, b')
        df1 = df1.withColumnRenamed('a', 'a1').withColumnRenamed('b', 'b1')
        cond = [df.a==df1.a1, df.b==df1.b1]
        df = df.join(df1, cond, 'inner').drop('a1', 'b1')

        df2 = sparkSession.sql('select a, b, collect_set(array(d)) as d
from tdl_spark_test group by a, b')
        df2 = df2.withColumnRenamed('a', 'a1').withColumnRenamed('b', 'b1')
        cond = [df.a==df2.a1, df.b==df2.b1]
        df = df.join(df2, cond, 'inner').drop('a1', 'b1')

        df.show()
        sparkSession.sql('uncache table tdl_spark_test')


        as you can see, the above code is just create a dataframe and two
child dataframe,the expected answer is that:
       +---+---+----------+----------+
        |  a|  b  |         c   |         d   |
       +---+---+----------+----------+
        |  2|  3  |[[5], [4]]|[[5], [6]] |
        |  1|  2  |[[5], [3]]|[[6], [4]] |
       +---+---+----------+----------+

        however,we had got the unexpected answer:
        +---+---+----------+----------+
         |  a  |  b |         c   |         d  |
        +---+---+----------+----------+
         |  2|  3  |[[5], [4]]|[[5], [4]] |
         |  1|  2  |[[5], [3]]|[[5], [3]] |
        +---+---+----------+----------+

         it seems that the column of the first dataframe had coverd the
column of the second dataframe.

         In addition, this error occurred as long as the following options
occurred at the same time:
         1. the first root table is cached.
         2. the "group by" is used in child dataframe.
         3. the "array" is used in "collect_set" in child dataframe.
         4. the join condition is "df.a==df2.a1, df.b==df2.b1" instead of
"['a', 'b']"

Mime
View raw message