spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dongjoon Hyun (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-28188) Materialize Dataframe API
Date Wed, 03 Jul 2019 23:17:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-28188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Dongjoon Hyun updated SPARK-28188:
----------------------------------
    Affects Version/s:     (was: 2.4.3)
                       3.0.0

> Materialize Dataframe API 
> --------------------------
>
>                 Key: SPARK-28188
>                 URL: https://issues.apache.org/jira/browse/SPARK-28188
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Vinitha Reddy Gankidi
>            Priority: Major
>
> We have added a new API to materialize dataframes and our internal users have found it
very useful. For use cases where you need to do different computations on the same dataframe,
Spark recomputes the dataframe each time. This is problematic if evaluation of the dataframe
is expensive.
> Materialize is a Spark action. It is a way to let Spark explicitly know that the dataframe
has already been computed. Once a dataframe is materialized, Spark skips all stages prior
to the materialize when the dataframe is reused later on.
> Spark may scan the same table twice if two queries load different columns. For example,
the following two queries would scan the same data twice:
> {code:java}
> val tab = spark.table("some_table").filter("c LIKE '%match%'")
> val num_groups = tab.agg(distinctCount($"a"))
> val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code}
>  
> The same table is scanned twice because Spark doesn't know it should load b when the
first query runs. You can use materialize to load and then reuse the data:
> {code:java}
> val materialized = spark.table("some_table").filter("c LIKE '%match%'")
>                         .select($"a", $"b").repartition($"a").materialize()
> val num_groups = materialized.agg(distinctCount($"a"))
> val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code}
>  
> This uses select to filter out columns that don't need to be loaded. Without this, Spark
doesn't know that only a and b are going to be used later.
> This example also uses repartition to add a shuffle because Spark resumes from the last
shuffle. In most cases you may need to repartition the dataframe before materializing it in
order to skip the expensive stages as repartition introduces a new stage. 
> h3. Materialize vs Cache:
>  * Caching/Persisting of dataframes is lazy. The first time the dataset is computed in
an action, it will be kept in memory on the nodes. Materialize is an action that runs a job
that produces the rows of data that a data frame represents, and returns a new data frame
with the result. When the result data frame is used, Spark resumes execution using the data
from the last shuffle.
>  * By reusing shuffle data, materialized data is served by the cluster's persistent shuffle
servers instead of Spark executors. This makes materialize more reliable. Caching on the other
hand happens in the executor where the task runs and data could be lost if executors time
out from inactivity or run out of memory.
>  * Since materialize is more reliable and uses fewer resources than cache, it is usually
a better choice for batch workloads. But, for processing that iterates over a dataset many
times, it is better to keep the data in memory using cache or persist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message