spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Nerothin <jasonnerot...@gmail.com>
Subject Re: dropDuplicate on timestamp based column unexpected output
Date Thu, 04 Apr 2019 13:46:28 GMT
How much memory do you have per partition?

On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri <chetan.opensource@gmail.com>
wrote:

> I will get the information and will share with you.
>
> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari <abdealikothari@gmail.com>
> wrote:
>
>> How long does it take to do the window solution ? (Also mention how many
>> executors was your spark application using on average during that time)
>> I am not aware of anything that is faster. When I ran is on my data
>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>
>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <chetan.opensource@gmail.com>
>> wrote:
>>
>>> Thanks for awesome clarification / explanation.
>>>
>>> I have cases where update_time can be same.
>>> I am in need of suggestions, where I have very large data like 5 GB,
>>> this window based solution which I mentioned is taking very long time.
>>>
>>> Thanks again.
>>>
>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
>>> abdealikothari@gmail.com> wrote:
>>>
>>>> So, the above code for min() worked for me fine in general, but there
>>>> was one corner case where it failed.
>>>> Which was when I have something like:
>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>>
>>>> In this example, the update_time for 2 records is the exact same. So,
>>>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>>>> This is avoided in your code snippet of row_num - because 2 rows will
>>>> never have row_num = 1
>>>>
>>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>>> (because orderBy is on update_time and they have the same value of
>>>> update_time).
>>>> Hence dropDuplicates can be used there cause it can be either one of
>>>> those rows.
>>>>
>>>> Overall - dropDuplicates seems like it's meant for cases where you
>>>> literally have redundant duplicated data. And not for filtering to get
>>>> first/last etc.
>>>>
>>>>
>>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>>> chetan.opensource@gmail.com> wrote:
>>>>
>>>>> Hello Abdeali, Thank you for your response.
>>>>>
>>>>> Can you please explain me this line, And the dropDuplicates at the end
>>>>> ensures records with two values for the same 'update_time' don't cause
>>>>> issues.
>>>>>
>>>>> Sorry I didn't get quickly. :)
>>>>>
>>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>>>> abdealikothari@gmail.com> wrote:
>>>>>
>>>>>> I've faced this issue too - and a colleague pointed me to the
>>>>>> documentation -
>>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>>>> dropDuplicates docs does not say that it will guarantee that it will
>>>>>> return the "first" record (even if you sort your dataframe)
>>>>>> It would give you any record it finds and just ensure that duplicates
>>>>>> are not present.
>>>>>>
>>>>>> The only way I know of how to do this is what you did, but you can
>>>>>> avoid the sorting inside the partition with something like (in pyspark):
>>>>>>
>>>>>> from pyspark.sql import Window, functions as F
>>>>>> df = df.withColumn('wanted_time',
>>>>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>>>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>>>>
>>>>>> The min() is faster than doing an orderBy() and a row_number().
>>>>>> And the dropDuplicates at the end ensures records with two values
for
>>>>>> the same 'update_time' don't cause issues.
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>>>>> chetan.opensource@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Dear Spark Users,
>>>>>>>
>>>>>>> I am using dropDuplicate on a DataFrame generated from large
parquet
>>>>>>> file from(HDFS) and doing dropDuplicate based on timestamp based
column,
>>>>>>> every time I run it drops different - different rows based on
same
>>>>>>> timestamp.
>>>>>>>
>>>>>>> What I tried and worked
>>>>>>>
>>>>>>> val wSpec = Window.partitionBy($"invoice_
>>>>>>> id").orderBy($"update_time".desc)
>>>>>>>
>>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>>>>> .drop("rn").drop("update_time")
>>>>>>>
>>>>>>> But this is damn slow...
>>>>>>>
>>>>>>> Can someone please throw a light.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>

-- 
Thanks,
Jason

Mime
View raw message