spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dhaval Patel <mailto.dhava...@gmail.com>
Subject Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
Date Sat, 14 Sep 2019 21:58:46 GMT
Hi Abhinesh,

As drop duplicates keeps first record, you can keep some id for 1st and 2nd
df and then
Union -> sort  on that id -> drop duplicates.
This will ensure records from 1st df is kept and 2nd are dropped.

Regards
Dhaval

On Sat, Sep 14, 2019 at 4:41 PM Abhinesh Hada <abhineshada@gmail.com> wrote:

> Hey Nathan,
>
> As the dataset is very huge, I am looking for ways that involve minimum
> joins. I will give a try to your approach.
> Thanks a lot for your help.
>
> On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld
> <nkronenfeld@uncharted.software> wrote:
>
>> It's a bit of a pain, but you could just use an outer join (assuming
>> there are no duplicates in the input datasets, of course):
>>
>> import org.apache.spark.sql.test.SharedSparkSession
>> import org.scalatest.FunSpec
>>
>> class QuestionSpec extends FunSpec with SharedSparkSession {
>>   describe("spark list question") {
>>     it("should join based on id with one row only per id, based on the
>> first dataset") {
>>       import testImplicits._
>>       import org.apache.spark.sql.functions.when
>>
>>       val ds1 = spark.createDataFrame(Seq(
>>         QuestionRecord(0, "dataset 1 record 1"),
>>         QuestionRecord(2, "dataset 1 record 2"),
>>         QuestionRecord(4, "dataset 1 record 3"),
>>         QuestionRecord(6, "dataset 1 record 4"),
>>         QuestionRecord(8, "dataset 1 record 5")
>>       ))
>>       val ds2 = spark.createDataFrame(Seq(
>>         QuestionRecord(0, "dataset 2 record 1"),
>>         QuestionRecord(3, "dataset 2 record 2"),
>>         QuestionRecord(6, "dataset 2 record 3"),
>>         QuestionRecord(9, "dataset 2 record 4"),
>>         QuestionRecord(12, "dataset 2 record 5")
>>       ))
>>
>>       val allColumns = ds1.columns
>>
>>       // Merge the datasets
>>       val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")
>>
>>       // Form new columns with the required value
>>       val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
>>         ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
>> ds1(nextColName)).otherwise(ds2(nextColName)))
>>       }
>>
>>       // Drop old columns
>>       val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
>>         ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
>>       }.drop("id")
>>
>>       // And get rid of our new_ marker
>>       val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
>>           ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
>>       }
>>
>>       ds6.show()
>>     }
>>   }
>> }
>>
>> case class QuestionRecord (id: Int, payload: String)
>>
>> On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada <abhineshada@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to take union of 2 dataframes and then drop duplicate based
>>> on the value of a specific column. But, I want to make sure that while
>>> dropping duplicates, the rows from first data frame are kept.
>>>
>>> Example:
>>> df1 = df1.union(df2).dropDuplicates(['id'])
>>>
>>>
>>>

Mime
View raw message