spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rohit Verma <rohit.ve...@rokittech.com>
Subject Which is better algorithm to use
Date Tue, 10 Jan 2017 12:45:00 GMT
Goal :
Find in two datasets if there are columns which are completely overlapping i.e. each value
of c1 exist in c2


Approach 1.
Do direction join
val ref = spark.sql("Select _c2 as ref from csv.`/Users/rohit/Desktop/star2002-full.csv` ")
val dep = spark.sql("Select _c1 as dep from csv.`/Users/rohit/Desktop/star2002-full.csv`”)

ref.registerTempTable("ref")
dep.registerTempTable(“dep”)
val ind = spark.sql("SELECT * from ref left join dep on ref.ref=dep.dep where dep.dep is NULL
limit 1”)

plan is

[cid:29FE6EA8-4598-4875-8F3F-A543B97BA293]


Approach 2:
Do distinct first and join (doing distinct by group by here)
val ref = spark.sql("Select _c2 as ref from csv.`/Users/rohit/Desktop/star2002-full.csv` group
by _c2 desc")
val dep = spark.sql("Select _c1 as dep from csv.`/Users/rohit/Desktop/star2002-full.csv` group
by _c1 desc")
ref.registerTempTable("ref")
dep.registerTempTable(“dep”)
val ind = spark.sql("SELECT * from ref left join dep on ref.ref=dep.dep where dep.dep is NULL
limit 1”)

join plan for above ind computation
[cid:3D90A8CF-446E-4973-A61B-4E1B148B2DE1]


Approach 3

Use localIterator to fail fast by comparing on sorted datasets directly.

final Iterator<Row> ref = spark.sql(refQuery).toLocalIterator();
final Iterator<Row> dep = spark.sql(depQuery).toLocalIterator();

while(dep.hasNext()){
    if(!ref.hasNext()) return false;
    if(ref.next()==dep.next()) continue;
    if(ref.next()!=dep.next()) return false;
}

above code approach is working on sorted list and using toLocalIterator. Is it safe to use
toLocalIterator for big lists, which would be more than memory.

I believe with this code we may not need to compare all the values which is required during
join . Although I am not clear if the limit expression in join statement doing something similar.



Regards
Rohit

On Jan 5, 2017, at 4:21 PM, Richard Startin <richardstartin@outlook.com<mailto:richardstartin@outlook.com>>
wrote:

Why not do that with spark sql to utilise the executors properly, rather than a sequential
filter on the driver.

Select * from A left join B on A.fk = B.fk where B.pk is NULL limit k

If you were sorting just so you could iterate in order, this might save you a couple of sorts
too.

https://richardstartin.com

On 5 Jan 2017, at 10:40, Rohit Verma <rohit.verma@rokittech.com> wrote:

Hi all,

I am aware that collect will return a list aggregated on driver, this will return OOM when
we have a too big list.
Is toLocalIterator safe to use with very big list, i want to access all values one by one.

Basically the goal is to compare two sorted rdds (A and B) to find top k entries missed in
B but there in A

Rohit
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message