spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abdeali Kothari <abdealikoth...@gmail.com>
Subject Re: dropDuplicate on timestamp based column unexpected output
Date Thu, 04 Apr 2019 05:11:15 GMT
I've faced this issue too - and a colleague pointed me to the documentation
-
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
dropDuplicates docs does not say that it will guarantee that it will return
the "first" record (even if you sort your dataframe)
It would give you any record it finds and just ensure that duplicates are
not present.

The only way I know of how to do this is what you did, but you can avoid
the sorting inside the partition with something like (in pyspark):

from pyspark.sql import Window, functions as F
df = df.withColumn('wanted_time',
F.min('update_time').over(Window.partitionBy('invoice_id')))
out_df = df.filter(df['update_time'] == df['wanted_time'])
.drop('wanted_time').dropDuplicates('invoice_id', 'update_time')

The min() is faster than doing an orderBy() and a row_number().
And the dropDuplicates at the end ensures records with two values for the
same 'update_time' don't cause issues.


On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <chetan.opensource@gmail.com>
wrote:

> Hello Dear Spark Users,
>
> I am using dropDuplicate on a DataFrame generated from large parquet file
> from(HDFS) and doing dropDuplicate based on timestamp based column, every
> time I run it drops different - different rows based on same timestamp.
>
> What I tried and worked
>
> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
> desc)
>
> val irqDistinctDF = irqFilteredDF.withColumn("rn",
> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time")
>
> But this is damn slow...
>
> Can someone please throw a light.
>
> Thanks
>
>

Mime
View raw message