spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shiyuan <gshy2...@gmail.com>
Subject Re: strange behavior of joining dataframes
Date Sat, 24 Mar 2018 00:29:07 GMT
Here is a simple example that reproduces the problem. This code has a
missing attribute('kk') error.  Is it  a bug?     Note that if the `select`
in line B is removed, this code would run.

import pyspark.sql.functions as F
df =
spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])

df = df.withColumnRenamed("k","kk")\
  .select("ID","score","LABEL","kk")    #line B

df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])

On Tue, Mar 20, 2018 at 9:58 PM, Shiyuan <gshy2014@gmail.com> wrote:

> Hi Spark-users:
> I have a dataframe "df_t" which was generated from other dataframes by
> several transformations. And then I  did something very simple,  just
> counting the rows, that is the following code:
>
> (A)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t_3 = df_t_1.join(df_t_2, ["Id"])
> df_t.join(df_t_3, ["Id","key"])
>
> When I run this query, I got the error that  "key" is missing during
> joining. However, the column "key" is clearly in the dataframe dt.  What is
> strange is that: if I first do this:
>
>  data = df_t.collect(); df_t = spark.createDataFrame(data);  (B)
>
> then (A) can run without error.  However,  the code (B) should not change
> the dataframe dt_t at all.  Why the snippet (A) can run with (B) but
> failed without (B)?  Also, A different joining sequence can also complete
> without error:
>
> (C)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t.join(df_t_1, ["Id","key"]).join(df_t_2, ["Id"])
>
> But (A) and (C) are conceptually the same and  should produce the same
> result.      What could possibly go wrong here?  Any hints to track down
> the problem is appreciated.  I am using spark 2.1.
>
>
>
>
>

Mime
View raw message