spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhinesh Hada <abhinesh...@gmail.com>
Subject Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
Date Sat, 14 Sep 2019 20:41:02 GMT
Hey Nathan,

As the dataset is very huge, I am looking for ways that involve minimum
joins. I will give a try to your approach.
Thanks a lot for your help.

On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld
<nkronenfeld@uncharted.software> wrote:

> It's a bit of a pain, but you could just use an outer join (assuming there
> are no duplicates in the input datasets, of course):
>
> import org.apache.spark.sql.test.SharedSparkSession
> import org.scalatest.FunSpec
>
> class QuestionSpec extends FunSpec with SharedSparkSession {
>   describe("spark list question") {
>     it("should join based on id with one row only per id, based on the
> first dataset") {
>       import testImplicits._
>       import org.apache.spark.sql.functions.when
>
>       val ds1 = spark.createDataFrame(Seq(
>         QuestionRecord(0, "dataset 1 record 1"),
>         QuestionRecord(2, "dataset 1 record 2"),
>         QuestionRecord(4, "dataset 1 record 3"),
>         QuestionRecord(6, "dataset 1 record 4"),
>         QuestionRecord(8, "dataset 1 record 5")
>       ))
>       val ds2 = spark.createDataFrame(Seq(
>         QuestionRecord(0, "dataset 2 record 1"),
>         QuestionRecord(3, "dataset 2 record 2"),
>         QuestionRecord(6, "dataset 2 record 3"),
>         QuestionRecord(9, "dataset 2 record 4"),
>         QuestionRecord(12, "dataset 2 record 5")
>       ))
>
>       val allColumns = ds1.columns
>
>       // Merge the datasets
>       val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")
>
>       // Form new columns with the required value
>       val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
>         ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
> ds1(nextColName)).otherwise(ds2(nextColName)))
>       }
>
>       // Drop old columns
>       val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
>         ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
>       }.drop("id")
>
>       // And get rid of our new_ marker
>       val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
>           ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
>       }
>
>       ds6.show()
>     }
>   }
> }
>
> case class QuestionRecord (id: Int, payload: String)
>
> On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada <abhineshada@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to take union of 2 dataframes and then drop duplicate based
>> on the value of a specific column. But, I want to make sure that while
>> dropping duplicates, the rows from first data frame are kept.
>>
>> Example:
>> df1 = df1.union(df2).dropDuplicates(['id'])
>>
>>
>>

Mime
View raw message