spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiao Li (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
Date Tue, 01 Dec 2015 02:16:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032939#comment-15032939
] 

Xiao Li commented on SPARK-12030:
---------------------------------

Let me post a simple case that can trigger the data corruption. The data set t1 is downloaded
from this JIRA. 

{code}
test("sort result") {
  withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
  SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
  val t1test = sqlContext.read.parquet("/Users/xiaoli/Downloads/t1").dropDuplicates().where("fk1=39
or (fk1=525 and id1 < 664618 and id1 >= 470050)").repartition(1).cache()

  //t1test.orderBy("fk1").explain(true)
  val t1 = t1test.orderBy("fk1").cache()

  checkAnswer( t1test, t1.collect() )
}
{code}

I am not sure if you can see the un-match. I am unable to reproduce it in the Thinkpad, but
I can easily reproduce it in my macbook. 

My case did not hit any exception, but I saw a data corruption. After sorting, one row [664615,525]
is replaced by another row [664611,525]. Thus one row disappears after sorting, but you can
see a duplicate in another row. The number of total rows is not changed after the sort. 

> Incorrect results when aggregate joined data
> --------------------------------------------
>
>                 Key: SPARK-12030
>                 URL: https://issues.apache.org/jira/browse/SPARK-12030
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Maciej BryƄski
>            Priority: Blocker
>         Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 5900000 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 1").collect()

> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message