spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: DataFrames Aggregate does not spill?
Date Tue, 22 Sep 2015 00:36:24 GMT
What's the plan if you run explain?

In 1.5 the default should be TungstenAggregate, which does spill (switching
from hash-based aggregation to sort-based aggregation).

On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mcheah@palantir.com> wrote:

> Hi everyone,
>
> I’m debugging some slowness and apparent memory pressure + GC issues after
> I ported some workflows from raw RDDs to Data Frames. In particular, I’m
> looking into an aggregation workflow that computes many aggregations per
> key at once.
>
> My workflow before was doing a fairly straightforward combineByKey call
> where the aggregation would build up non-trivially sized objects in memory
> – I was computing numerous sums over various fields of the data at a time.
> In particular, I noticed that there was spilling to disk on the map side of
> the aggregation.
>
> When I switched to using DataFrames aggregation – particularly
> DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large
> number of “Sum” exprs - the execution began to choke. I saw one of my
> executors had a long GC pause and my job isn’t able to recover. However
> when I reduced the number of Sum expressions being computed in the
> aggregation, the workflow started to work normally.
>
> I have a hypothesis that I would like to run by everyone. In
> org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I’m looking at
> the execution of Aggregation
> <https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala>,
> which appears to build the aggregation result in memory via updating a
> HashMap and iterating over the rows. However this appears to be less robust
> than what would happen if PairRDDFunctions.combineByKey were to be used. If
> combineByKey were used, then instead of using two mapPartitions calls
> (assuming the aggregation is partially-computable, as Sum is), it would use
> the ExternalSorter and ExternalAppendOnlyMap objects to compute the
> aggregation. This would allow the aggregated result to grow large as some
> of the aggregated result could be spilled to disk. This especially seems
> bad if the aggregation reduction factor is low; that is, if there are many
> unique keys in the dataset. In particular, the Hash Map is holding O(# of
> keys * number of aggregated results per key) items in memory at a time.
>
> I was wondering what everyone’s thought on this problem is. Did we
> consciously think about the robustness implications when choosing to use an
> in memory Hash Map to compute the aggregation? Is this an inherent
> limitation of the aggregation implementation in Data Frames?
>
> Thanks,
>
> -Matt Cheah
>
>
>
>
>
>

Mime
View raw message