spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <>
Subject Re: Preserving partitioning with dataframe select
Date Mon, 08 Feb 2016 07:11:58 GMT

Thanks for the email. Are you just asking whether it should work, or
reporting they don't work?

Internally, the way we track physical data distribution should make the
scenarios described work. If it doesn't, we should make them work.

On Sat, Feb 6, 2016 at 6:49 AM, Matt Cheah <> wrote:

> Hi everyone,
> When using raw RDDs, it is possible to have a map() operation indicate
> that the partitioning for the RDD would be preserved by the map operation.
> This makes it easier to reduce the overhead of shuffles by ensuring that
> RDDs are co-partitioned when they are joined.
> When I'm using Data Frames, I'm pre-partitioning the data frame by using
> DataFrame.partitionBy($"X"), but I will invoke a select statement after the
> partitioning before joining that dataframe with another. Roughly speaking,
> I'm doing something like this pseudo-code:
> partitionedDataFrame = dataFrame.partitionBy("$X")
> groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations)
> // Rename "X" to "Y" to make sure columns are unique
> groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y")
> // Roughly speaking, join on "X == Y" to get the aggregation results onto
> every row
> joinedDataFrame = partitionedDataFrame.join(groupedDataFrame)
> However the renaming of the columns maps to a select statement, and to my
> knowledge, selecting the columns is throwing off the partitioning which
> results in shuffle both the partitionedDataFrame and the groupedDataFrame.
> I have the following questions given this example:
> 1) Is pre-partitioning the Data Frame effective? In other words, does the
> physical planner recognize when underlying RDDs are co-partitioned and
> compute more efficient joins by reducing the amount of data that is
> shuffled?
> 2) If the planner takes advantage of co-partitioning, is the renaming of
> the columns invalidating the partitioning of the grouped Data Frame? When I
> look at the planner's conversion from logical.Project to the physical plan,
> I only see it invoking child.mapPartitions without specifying the
> preservesPartitioning flag.
> Thanks,
> -Matt Cheah

View raw message