spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Miguel Morales <therevolti...@gmail.com>
Subject Re: High level explanation of dropDuplicates
Date Sat, 11 Jan 2020 20:01:41 GMT
I would just map to pair using the id. Then do a reduceByKey where you compare the scores and
keep the highest. Then do .values and that should do it.

Sent from my iPhone

> On Jan 11, 2020, at 11:14 AM, Rishi Shah <rishishah.star@gmail.com> wrote:
> 
> 
> Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone
has discovered a different or have an opinion on better approach to deduplicating data using
pyspark. Would really appreciate any further insight on this.
> 
> Thanks,
> -Rishi
> 
>> On Wed, Jun 12, 2019 at 4:21 PM Yeikel <email@yeikel.com> wrote:
>> Nicholas , thank you for your explanation. 
>> 
>> I am also interested in the example that Rishi is asking for.  I am sure
>> mapPartitions may work , but as Vladimir suggests it may not be the best
>> option in terms of performance. 
>> 
>> @Vladimir Prus , are you aware of any example about writing a  "custom
>> physical exec operator"? 
>> 
>> If anyone needs a further explanation for the follow up  question Rishi 
>> posted , please see the example below : 
>> 
>> 
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>> 
>> 
>> val someData = Seq(
>>   Row(1, 10),
>>   Row(1, 20),
>>   Row(1, 11)
>> )
>> 
>> val schema = List(
>>   StructField("id", IntegerType, true),
>>   StructField("score", IntegerType, true)
>> )
>> 
>> val df = spark.createDataFrame(
>>   spark.sparkContext.parallelize(someData),
>>   StructType(schema)
>> )
>> 
>> // Goal : Drop duplicates using the "id" as the primary key and keep the
>> highest "score".
>> 
>> df.sort($"score".desc).dropDuplicates("id").show
>> 
>> == Physical Plan ==
>> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
>> +- Exchange hashpartitioning(id#191, 200)
>>    +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
>> false)])
>>       +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
>>          +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
>>             +- Scan ExistingRDD[id#191,score#192]
>> 
>> This seems to work , but I don't know what are the implications if we use
>> this approach with a bigger dataset or what are the alternatives. From the
>> explain output I can see the two Exchanges , so it may not be the best
>> approach? 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> 
> 
> 
> -- 
> Regards,
> 
> Rishi Shah

Mime
View raw message