spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sesterhenn, Mike" <msesterh...@cars.com>
Subject RE: Best way to process lookup ETL with Dataframes
Date Wed, 04 Jan 2017 23:29:48 GMT
Thanks a lot Nicholas.  RE: Upgrading, I was afraid someone would suggest that.  ☺  Yes we
have an upgrade planned, but due to politics, we have to finish this first round of ETL before
we can do the upgrade.  I can’t confirm for sure that this issue would be fixed in Spark
>= 1.6 without doing the upgrade first, so I won’t be able to win the argument for upgrading
yet…  You see the problem…   :(

Anyway, the good news is we just had a memory upgrade, so I should be able to do more persisting
of the dataframes.  I am currently only persisting the join table (the table I am joining
to, not the input data).  Although I do cache the input at some point before the join, it
is not every time I do a split+merge.  I’ll have to persist the input data better.

Thinking on it now, is it even necessary to cache the table I am joining to?  Probably only
if it is used more than once, right?

Thanks,
-Mike


From: Nicholas Hakobian [mailto:nicholas.hakobian@rallyhealth.com]
Sent: Friday, December 30, 2016 5:50 PM
To: Sesterhenn, Mike
Cc: ayan guha; user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

Yep, sequential joins is what I have done in the past with similar requirements.

Splitting and merging DataFrames is most likely killing performance if you do not cache the
DataFrame pre-split. If you do, it will compute the lineage prior to the cache statement once
(at first invocation), then use the cached result to perform the additional join, then union
the results. Without the cache, you are most likely computing the full lineage twice, all
the way back to the raw data import and having double the read I/O.

The most optimal path will most likely depend on the size of the tables you are joining to.
If both are small (compared to the primary data source) and can be broadcasted, doing the
sequential join will most likely be the easiest and most efficient approach. If one (or both)
of the tables you are joining to are significantly large enough that they cannot be efficiently
broadcasted, going through the join / cache / split / second join / union path is likely to
be faster. It also depends on how much memory you can dedicate to caching...the possibilities
are endless.

I tend to approach this type of problem by weighing the cost of extra development time for
a more complex join vs the extra execution time vs frequency of execution. For something that
will execute daily (or more frequently) the cost of more development to have faster execution
time (even if its only 2x faster) might be worth it.

It might also be worth investigating if a newer version of Spark (1.6 at the least, or 2.0
if possible) is feasible to install. There are lots of performance improvements in those versions,
if you have the option of upgrading.

-Nick

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakobian@rallyhealth.com<mailto:nicholas.hakobian@rallyhealth.com>


On Fri, Dec 30, 2016 at 3:35 PM, Sesterhenn, Mike <msesterhenn@cars.com<mailto:msesterhenn@cars.com>>
wrote:

Thanks Nicholas.  It looks like for some of my use cases, I might be able to use do sequential
joins, and then use coalesce() (or in combination with withColumn(when()...)) to sort out
the results.



Splitting and merging dataframes seems to really kills my app performance.  I'm not sure if
it's a spark 1.5 thing or what, but I just refactored one column to do one less split/merge,
and it saved me almost half the time on my job.  But for some use cases I don't seem to be
able to avoid them.  It is important in some cases to NOT do a join under certain conditions
for a row because bad data will result.



Any other thoughts?

________________________________
From: Nicholas Hakobian <nicholas.hakobian@rallyhealth.com<mailto:nicholas.hakobian@rallyhealth.com>>
Sent: Friday, December 30, 2016 2:12:40 PM
To: Sesterhenn, Mike
Cc: ayan guha; user@spark.apache.org<mailto:user@spark.apache.org>

Subject: Re: Best way to process lookup ETL with Dataframes

It looks like Spark 1.5 has the coalesce function, which is like NVL, but a bit more flexible.
From Ayan's example you should be able to use:
coalesce(b.col, c.col, 'some default')

If that doesn't have the flexibility you want, you can always use nested case or if statements,
but its just harder to read.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakobian@rallyhealth.com<mailto:nicholas.hakobian@rallyhealth.com>



On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike <msesterhenn@cars.com<mailto:msesterhenn@cars.com>>
wrote:

Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)



Reading about the Oracle nvl function, it seems it is similar to the na functions.  Not sure
it will help though, because what I need is to join after the first join fails.

________________________________
From: ayan guha <guha.ayan@gmail.com<mailto:guha.ayan@gmail.com>>
Sent: Thursday, December 29, 2016 11:06 PM
To: Sesterhenn, Mike
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Best way to process lookup ETL with Dataframes

How about this -

select a.*, nvl(b.col,nvl(c.col,'some default'))
from driving_table a
left outer join lookup1 b on a.id<http://a.id>=b.id<http://b.id>
left outer join lookup2 c on a.id<http://a.id>=c,id

?

On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike <msesterhenn@cars.com<mailto:msesterhenn@cars.com>>
wrote:

Hi all,



I'm writing an ETL process with Spark 1.5, and I was wondering the best way to do something.



A lot of the fields I am processing require an algorithm similar to this:



Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

    Lookup into some other table to join some other fields.

}



With Dataframes, it seems the only way to do this is to do something like this:



Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

   *SPLIT the dataframe into two DFs via DataFrame.filter(),

      one group with successful lookup, the other failed).*

   For failed lookup:  {

       Lookup into some other table to grab some other fields.

   }

   *MERGE the dataframe splits back together via DataFrame.unionAll().*
}



I'm seeing some really large execution plans as you might imagine in the Spark Ui, and the
processing time seems way out of proportion with the size of the dataset.  (~250GB in 9 hours).



Is this the best approach to implement an algorithm like this?  Note also that some fields
I am implementing require multiple staged split/merge steps due to cascading lookup joins.



Thanks,



Michael Sesterhenn

msesterhenn@cars.com<mailto:msesterhenn@cars.com>




--
Best Regards,
Ayan Guha


Mime
View raw message