spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lalwani, Jayesh" <jlalw...@amazon.com.INVALID>
Subject Re: Merge two dataframes
Date Mon, 17 May 2021 19:31:29 GMT
If the UDFs are computationally expensive, I wouldn't solve this problem with  UDFs at all.
If they are working in an iterative manner, and assuming each iteration is independent of
other iterations (yes, I know that's a big assumptiuon), I would think about exploding your
dataframe to have a row per iteration, and working on each row separately, and then aggregating
in the end. This allows you to scale your computation much better. 

I know not all computations can be map-reducable like that. However, most can. 

Split and merge data workflows in Spark don't work like their DAG representations, unless
you add costly caches. Without caching, each split will result in Spark rereading data from
the source, even if the splits are getting merged together. The only way to avoid it is by
caching at the split point, which depending on the amount of data can become costly. Also,
joins result in shuffles. Avoiding splits and merges is better.

To give you an example, we had an application that applied a series of rules to rows. The
output required was a dataframe with an additional column that indicated which rule the row
satisfied. In our initial implementation, we had a series of r one per rule. For N rules,
we created N dataframes that had the rows that satisfied the rules. The we unioned the N data
frames. Horrible performance that didn't scale with N. We reimplemented to add N Boolean columns;
one per rule; that indicated if the rule was satisfied. We just kept adding the boolen columns
to the dataframe. After iterating over the rules, we added another column that indicated out
which rule was satisfied, and then dropped the Boolean columns. Much better performance that
scaled with N. Spark read from datasource just once, and since there were no joins/unions,
there was no shuffle

´╗┐On 5/17/21, 2:56 PM, "Andrew Melo" <andrew.melo@gmail.com> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or
open attachments unless you can confirm the sender and know the content is safe.



    In our case, these UDFs are quite expensive and worked on in an
    iterative manner, so being able to cache the two "sides" of the graphs
    independently will speed up the development cycle. Otherwise, if you
    modify foo() here, then you have to recompute bar and baz, even though
    they're unchanged.

    df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', baz('x'))

    Additionally, a longer goal would be to be able to persist/cache these
    columns to disk so a downstream user could later mix and match several
    (10s) of these columns together as their inputs w/o having to
    explicitly compute them themselves.

    Cheers
    Andrew

    On Mon, May 17, 2021 at 1:10 PM Sean Owen <srowen@gmail.com> wrote:
    >
    > Why join here - just add two columns to the DataFrame directly?
    >
    > On Mon, May 17, 2021 at 1:04 PM Andrew Melo <andrew.melo@gmail.com> wrote:
    >>
    >> Anyone have ideas about the below Q?
    >>
    >> It seems to me that given that "diamond" DAG, that spark could see
    >> that the rows haven't been shuffled/filtered, it could do some type of
    >> "zip join" to push them together, but I've not been able to get a plan
    >> that doesn't do a hash/sort merge join
    >>
    >> Cheers
    >> Andrew
    >>

    ---------------------------------------------------------------------
    To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message