spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng, Hao" <hao.ch...@intel.com>
Subject RE: Sort Merge Join
Date Tue, 03 Nov 2015 01:03:24 GMT
No as far as I can tell, @Michael @YinHuai @Reynold , any comments on this optimization?

From: Jonathan Coveney [mailto:jcoveney@gmail.com]
Sent: Tuesday, November 3, 2015 4:17 AM
To: Alex Nastetsky
Cc: Cheng, Hao; user
Subject: Re: Sort Merge Join

Additionally, I'm curious if there are any JIRAS around making dataframes support ordering
better? there are a lot of operations that can be optimized if you know that you have a total
ordering on your data...are there any plans, or at least JIRAS, around having the catalyst
optimizer handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky <alex.nastetsky@vervemobile.com<mailto:alex.nastetsky@vervemobile.com>>:
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be a simple and
SAFE way for JOIN, as it’s hard to guarantee the records from different data sets with the
identical join keys will be loaded by the same node/task , since lots of factors need to be
considered, like task pool size, cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in Hive, it
is called bucket join. I am not sure will that happens soon in Spark SQL.

Yes, this is supported in

  *   Hive with bucket join
  *   Pig with USING "merge"<https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins>
  *   MR with CompositeInputFormat
But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao <hao.cheng@intel.com<mailto:hao.cheng@intel.com>>
wrote:
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For example, in the code
below, the two datasets have different number of partitions, but it still does a SortMerge
join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) requires the records
with the identical join keys MUST BE shuffled to the same “reducer” node / task, hashpartitioning
is just a strategy to tell spark shuffle service how to achieve that, in theory, we even can
use the `RangePartitioning` instead (but it’s less efficient, that’s why we don’t choose
it for JOIN). So conceptually the JOIN operator doesn’t care so much about the shuffle strategy
so much if it satisfies the demand on data distribution.

2) If both datasets have already been previously partitioned/sorted the same and stored on
the file system (e.g. in a previous job), is there a way to tell Spark this so that it won't
want to do a "hashpartitioning" on them? It looks like Spark just considers datasets that
have been just read from the the file system to have UnknownPartitioning. In the example below,
I try to join a dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key JOIN src c ON
b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
    SortMergeJoin [key#19], [key#21]
     TungstenSort [key#19 ASC], false, 0
      TungstenExchange hashpartitioning(key#19,200)
       ConvertToUnsafe
        HiveTableScan [key#19,value#20], (MetastoreRelation default, src, Some(a))
     TungstenSort [key#21 ASC], false, 0
      TungstenExchange hashpartitioning(key#21,200)
       ConvertToUnsafe
        HiveTableScan [key#21,value#22], (MetastoreRelation default, src, Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
    ConvertToUnsafe
     HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b ON a.key=b.key”,
as we didn’t change the data distribution after it, so we can join another table “JOIN
src c ON b.key=c.key” directly, which only require the table “c” for repartitioning
on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a simple and
SAFE way for JOIN, as it’s hard to guarantee the records from different data sets with the
identical join keys will be loaded by the same node/task , since lots of factors need to be
considered, like task pool size, cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in Hive, it
is called bucket join. I am not sure will that happens soon in Spark SQL.

Hao

From: Alex Nastetsky [mailto:alex.nastetsky@vervemobile.com<mailto:alex.nastetsky@vervemobile.com>]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For example, in the code
below, the two datasets have different number of partitions, but it still does a SortMerge
join after a "hashpartitioning".

CODE:
   val sparkConf = new SparkConf()
      .setAppName("SortMergeJoinTest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.eventLog.enabled", "true")
      .set("spark.sql.planner.sortMergeJoin","true")

    sparkConf.setMaster("local-cluster[3,1,1024]")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val inputpath = input.gz.parquet

    val df1 = sqlContext.read.parquet(inputpath).repartition(3)
    val df2 = sqlContext.read.parquet(inputpath).repartition(5)
    val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === $"foo2")
    result.explain()

OUTPUT:
    == Physical Plan ==
    SortMergeJoin [foo#0], [foo2#8]
    TungstenSort [foo#0 ASC], false, 0
      TungstenExchange hashpartitioning(foo#0)
      ConvertToUnsafe
            Repartition 3, true
            Scan ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
    TungstenSort [foo2#8 ASC], false, 0
      TungstenExchange hashpartitioning(foo2#8)
      TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
            Repartition 5, true
            Scan ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the same and stored on
the file system (e.g. in a previous job), is there a way to tell Spark this so that it won't
want to do a "hashpartitioning" on them? It looks like Spark just considers datasets that
have been just read from the the file system to have UnknownPartitioning. In the example below,
I try to join a dataframe to itself, and it still wants to hash repartition.

CODE:
    ...
    val df1 = sqlContext.read.parquet(inputpath)
    val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" === $"foo2")
    result.explain()

OUTPUT:
    == Physical Plan ==
    SortMergeJoin [foo#0], [foo2#4]
    TungstenSort [foo#0 ASC], false, 0
      TungstenExchange hashpartitioning(foo#0)
      ConvertToUnsafe
            Scan ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
    TungstenSort [foo2#4 ASC], false, 0
      TungstenExchange hashpartitioning(foo2#4)
      ConvertToUnsafe
            Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
            Scan ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]


Thanks.


Mime
View raw message