spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deepak Gopalakrishnan <dgk...@gmail.com>
Subject Re: Mapper side join with DataFrames API
Date Thu, 03 Mar 2016 01:36:25 GMT
Hello,

I'm using 1.6.0 on EMR

On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang <java8964@hotmail.com> wrote:

> What version of Spark you are using?
>
> I am also trying to figure out how to do the map side join in Spark.
>
> In 1.5.x, there is a broadcast function in the Dataframe, and it caused
> OOM for me simple test case, even one side of join is very small.
>
> I am still trying to find out the root cause yet.
>
> Yong
>
> ------------------------------
> Date: Wed, 2 Mar 2016 15:38:29 +0530
> Subject: Re: Mapper side join with DataFrames API
> From: dgkris@gmail.com
> To: michael@databricks.com
> CC: user@spark.apache.org
>
>
> Thanks for the help guys.
>
> Just to ask a part of my question in a little different way.
>
> I have attached my screenshots here. There is so much of memory that is
> unused and yet there is a spill ( as in screenshots). Any idea why ?
>
> Thanks
> Deepak
>
> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust <michael@databricks.com>
> wrote:
>
> Its helpful to always include the output of df.explain(true) when you are
> asking about performance.
>
> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan <dgkris@gmail.com>
> wrote:
>
> Hello All,
>
> I'm trying to join 2 dataframes A and B with a
>
> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>
> Now what I have done is that I have registeredTempTables for A and B after
> loading these DataFrames from different sources. I need the join to be
> really fast and I was wondering if there is a way to use the SQL statement
> and then being able to do a mapper side join ( say my table B is small) ?
>
> I read some articles on using broadcast to do mapper side joins. Could I
> do something like this and then execute my sql statement to achieve mapper
> side join ?
>
> DataFrame B = sparkContext.broadcast(B);
> B.registerTempTable("B");
>
>
> I have a join as stated above and I see in my executor logs the below :
>
> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
> (TID 1114) in 20354 ms on localhost (196/200)
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty
> blocks out of 200 blocks
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 128 blocks
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
> 1115). 2511 bytes result sent to driver
> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
> (TID 1115) in 27621 ms on localhost (197/200)
>
> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
> data of 256.0 KB to disk (0  time so far)*
>
>
> Now, I have around 10G of executor memory and my memory faction should be
> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G(
> obtained from the Storage tab on Spark dashboard), but still it says
> spilling sort data. I'm a little surprised why this happens even when I
> have enough memory free.
> Any inputs will be greatly appreciated!
> Thanks
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
>
>
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
> --------------------------------------------------------------------- To
> unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional
> commands, e-mail: user-help@spark.apache.org
>



-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com

Mime
View raw message