Abdeali, Jason:

while submitting spark job num-executors 8, num-cores 8, driver-memory 14g and executor-memory 14g, the size of total data was processed were 5 GB with 100+ aggregation and 50+ different joins at various data frame level. 

So it is really hard to tell specific number of partitions. But I have not done repartition / coalesce so default 200 would be used, I guess.

I read , a long time ago that Window function is killer. So I wanted to clarify my doubt.


On Thu, Apr 4, 2019 at 10:43 PM Jason Nerothin <jasonnerothin@gmail.com> wrote:
My thinking is that if you run everything in one partition - say 12 GB - then you don't experience the partitioning problem - one partition will have all duplicates.

If that's not the case, there are other options, but would probably require a design change.

On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin <jasonnerothin@gmail.com> wrote:
How much memory do you have per partition?

On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri <chetan.opensource@gmail.com> wrote:
I will get the information and will share with you.

On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari <abdealikothari@gmail.com> wrote:
How long does it take to do the window solution ? (Also mention how many executors was your spark application using on average during that time)
I am not aware of anything that is faster. When I ran is on my data ~8-9GB I think it took less than 5 mins (don't remember exact time)

On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <chetan.opensource@gmail.com> wrote:
Thanks for awesome clarification / explanation.

I have cases where update_time can be same. 
I am in need of suggestions, where I have very large data like 5 GB, this window based solution which I mentioned is taking very long time. 

Thanks again.

On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <abdealikothari@gmail.com> wrote:
So, the above code for min() worked for me fine in general, but there was one corner case where it failed.
Which was when I have something like:
invoice_id=1, update_time=2018-01-01 15:00:00.000
invoice_id=1, update_time=2018-01-01 15:00:00.000
invoice_id=1, update_time=2018-02-03 14:00:00.000

In this example, the update_time for 2 records is the exact same. So, doing a filter for the min() will result in 2 records for the invoice_id=1.
This is avoided in your code snippet of row_num - because 2 rows will never have row_num = 1

But note that here - row_num=1 and row_num=2 will be randomly ordered (because orderBy is on update_time and they have the same value of update_time).
Hence dropDuplicates can be used there cause it can be either one of those rows.

Overall - dropDuplicates seems like it's meant for cases where you literally have redundant duplicated data. And not for filtering to get first/last etc.

On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <chetan.opensource@gmail.com> wrote:
Hello Abdeali, Thank you for your response.

Can you please explain me this line, And the dropDuplicates at the end ensures records with two values for the same 'update_time' don't cause issues.

Sorry I didn't get quickly. :)

On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <abdealikothari@gmail.com> wrote:
I've faced this issue too - and a colleague pointed me to the documentation - https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
dropDuplicates docs does not say that it will guarantee that it will return the "first" record (even if you sort your dataframe)
It would give you any record it finds and just ensure that duplicates are not present.

The only way I know of how to do this is what you did, but you can avoid the sorting inside the partition with something like (in pyspark):

from pyspark.sql import Window, functions as F
df = df.withColumn('wanted_time', F.min('update_time').over(Window.partitionBy('invoice_id')))
out_df = df.filter(df['update_time'] == df['wanted_time']).drop('wanted_time').dropDuplicates('invoice_id', 'update_time')

The min() is faster than doing an orderBy() and a row_number().
And the dropDuplicates at the end ensures records with two values for the same 'update_time' don't cause issues.

On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <chetan.opensource@gmail.com> wrote:
Hello Dear Spark Users,

I am using dropDuplicate on a DataFrame generated from large parquet file from(HDFS) and doing dropDuplicate based on timestamp based column, every time I run it drops different - different rows based on same timestamp.

What I tried and worked

val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".desc) 

val irqDistinctDF = irqFilteredDF.withColumn("rn", row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time")

But this is damn slow...

Can someone please throw a light.