spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chetan Khatri <chetan.opensou...@gmail.com>
Subject Re: dropDuplicate on timestamp based column unexpected output
Date Thu, 04 Apr 2019 18:24:08 GMT
Abdeali, Jason:

while submitting spark job num-executors 8, num-cores 8, driver-memory 14g
and executor-memory 14g, the size of total data was processed were 5 GB
with 100+ aggregation and 50+ different joins at various data frame level.

So it is really hard to tell specific number of partitions. But I have not
done repartition / coalesce so default 200 would be used, I guess.

I read , a long time ago that Window function is killer. So I wanted to
clarify my doubt.

Thanks



On Thu, Apr 4, 2019 at 10:43 PM Jason Nerothin <jasonnerothin@gmail.com>
wrote:

> My thinking is that if you run everything in one partition - say 12 GB -
> then you don't experience the partitioning problem - one partition will
> have all duplicates.
>
> If that's not the case, there are other options, but would probably
> require a design change.
>
> On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin <jasonnerothin@gmail.com>
> wrote:
>
>> How much memory do you have per partition?
>>
>> On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri <chetan.opensource@gmail.com>
>> wrote:
>>
>>> I will get the information and will share with you.
>>>
>>> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari <abdealikothari@gmail.com>
>>> wrote:
>>>
>>>> How long does it take to do the window solution ? (Also mention how
>>>> many executors was your spark application using on average during that time)
>>>> I am not aware of anything that is faster. When I ran is on my data
>>>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>>>
>>>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <
>>>> chetan.opensource@gmail.com> wrote:
>>>>
>>>>> Thanks for awesome clarification / explanation.
>>>>>
>>>>> I have cases where update_time can be same.
>>>>> I am in need of suggestions, where I have very large data like 5 GB,
>>>>> this window based solution which I mentioned is taking very long time.
>>>>>
>>>>> Thanks again.
>>>>>
>>>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
>>>>> abdealikothari@gmail.com> wrote:
>>>>>
>>>>>> So, the above code for min() worked for me fine in general, but there
>>>>>> was one corner case where it failed.
>>>>>> Which was when I have something like:
>>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>>>>
>>>>>> In this example, the update_time for 2 records is the exact same.
So,
>>>>>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>>>>>> This is avoided in your code snippet of row_num - because 2 rows
will
>>>>>> never have row_num = 1
>>>>>>
>>>>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>>>>> (because orderBy is on update_time and they have the same value of
>>>>>> update_time).
>>>>>> Hence dropDuplicates can be used there cause it can be either one
of
>>>>>> those rows.
>>>>>>
>>>>>> Overall - dropDuplicates seems like it's meant for cases where you
>>>>>> literally have redundant duplicated data. And not for filtering to
get
>>>>>> first/last etc.
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>>>>> chetan.opensource@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Abdeali, Thank you for your response.
>>>>>>>
>>>>>>> Can you please explain me this line, And the dropDuplicates at
the
>>>>>>> end ensures records with two values for the same 'update_time'
don't cause
>>>>>>> issues.
>>>>>>>
>>>>>>> Sorry I didn't get quickly. :)
>>>>>>>
>>>>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>>>>>> abdealikothari@gmail.com> wrote:
>>>>>>>
>>>>>>>> I've faced this issue too - and a colleague pointed me to
the
>>>>>>>> documentation -
>>>>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>>>>>> dropDuplicates docs does not say that it will guarantee that
it
>>>>>>>> will return the "first" record (even if you sort your dataframe)
>>>>>>>> It would give you any record it finds and just ensure that
>>>>>>>> duplicates are not present.
>>>>>>>>
>>>>>>>> The only way I know of how to do this is what you did, but
you can
>>>>>>>> avoid the sorting inside the partition with something like
(in pyspark):
>>>>>>>>
>>>>>>>> from pyspark.sql import Window, functions as F
>>>>>>>> df = df.withColumn('wanted_time',
>>>>>>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>>>>>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>>>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>>>>>>
>>>>>>>> The min() is faster than doing an orderBy() and a row_number().
>>>>>>>> And the dropDuplicates at the end ensures records with two
values
>>>>>>>> for the same 'update_time' don't cause issues.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>>>>>>> chetan.opensource@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Dear Spark Users,
>>>>>>>>>
>>>>>>>>> I am using dropDuplicate on a DataFrame generated from
large
>>>>>>>>> parquet file from(HDFS) and doing dropDuplicate based
on timestamp based
>>>>>>>>> column, every time I run it drops different - different
rows based on same
>>>>>>>>> timestamp.
>>>>>>>>>
>>>>>>>>> What I tried and worked
>>>>>>>>>
>>>>>>>>> val wSpec = Window.partitionBy($"invoice_
>>>>>>>>> id").orderBy($"update_time".desc)
>>>>>>>>>
>>>>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>>>>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>>>>>>> .drop("rn").drop("update_time")
>>>>>>>>>
>>>>>>>>> But this is damn slow...
>>>>>>>>>
>>>>>>>>> Can someone please throw a light.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>
>> --
>> Thanks,
>> Jason
>>
>
>
> --
> Thanks,
> Jason
>

Mime
View raw message