spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chetan Khatri <chetan.opensou...@gmail.com>
Subject Re: dropDuplicate on timestamp based column unexpected output
Date Thu, 04 Apr 2019 06:15:48 GMT
Hello Abdeali, Thank you for your response.

Can you please explain me this line, And the dropDuplicates at the end
ensures records with two values for the same 'update_time' don't cause
issues.

Sorry I didn't get quickly. :)

On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <abdealikothari@gmail.com>
wrote:

> I've faced this issue too - and a colleague pointed me to the
> documentation -
> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
> dropDuplicates docs does not say that it will guarantee that it will
> return the "first" record (even if you sort your dataframe)
> It would give you any record it finds and just ensure that duplicates are
> not present.
>
> The only way I know of how to do this is what you did, but you can avoid
> the sorting inside the partition with something like (in pyspark):
>
> from pyspark.sql import Window, functions as F
> df = df.withColumn('wanted_time',
> F.min('update_time').over(Window.partitionBy('invoice_id')))
> out_df = df.filter(df['update_time'] == df['wanted_time'])
> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>
> The min() is faster than doing an orderBy() and a row_number().
> And the dropDuplicates at the end ensures records with two values for the
> same 'update_time' don't cause issues.
>
>
> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <chetan.opensource@gmail.com>
> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am using dropDuplicate on a DataFrame generated from large parquet file
>> from(HDFS) and doing dropDuplicate based on timestamp based column, every
>> time I run it drops different - different rows based on same timestamp.
>>
>> What I tried and worked
>>
>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>> desc)
>>
>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time"
>> )
>>
>> But this is damn slow...
>>
>> Can someone please throw a light.
>>
>> Thanks
>>
>>

Mime
View raw message