spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan Kronenfeld <nkronenf...@uncharted.software>
Subject Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
Date Fri, 13 Sep 2019 19:28:06 GMT
It's a bit of a pain, but you could just use an outer join (assuming there
are no duplicates in the input datasets, of course):

import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.FunSpec

class QuestionSpec extends FunSpec with SharedSparkSession {
  describe("spark list question") {
    it("should join based on id with one row only per id, based on the
first dataset") {
      import testImplicits._
      import org.apache.spark.sql.functions.when

      val ds1 = spark.createDataFrame(Seq(
        QuestionRecord(0, "dataset 1 record 1"),
        QuestionRecord(2, "dataset 1 record 2"),
        QuestionRecord(4, "dataset 1 record 3"),
        QuestionRecord(6, "dataset 1 record 4"),
        QuestionRecord(8, "dataset 1 record 5")
      ))
      val ds2 = spark.createDataFrame(Seq(
        QuestionRecord(0, "dataset 2 record 1"),
        QuestionRecord(3, "dataset 2 record 2"),
        QuestionRecord(6, "dataset 2 record 3"),
        QuestionRecord(9, "dataset 2 record 4"),
        QuestionRecord(12, "dataset 2 record 5")
      ))

      val allColumns = ds1.columns

      // Merge the datasets
      val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")

      // Form new columns with the required value
      val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
        ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
ds1(nextColName)).otherwise(ds2(nextColName)))
      }

      // Drop old columns
      val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
        ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
      }.drop("id")

      // And get rid of our new_ marker
      val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
          ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
      }

      ds6.show()
    }
  }
}

case class QuestionRecord (id: Int, payload: String)

On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada <abhineshada@gmail.com>
wrote:

> Hi,
>
> I am trying to take union of 2 dataframes and then drop duplicate based on
> the value of a specific column. But, I want to make sure that while
> dropping duplicates, the rows from first data frame are kept.
>
> Example:
> df1 = df1.union(df2).dropDuplicates(['id'])
>
>
>

Mime
View raw message