spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Coveney <jcove...@gmail.com>
Subject Re: Sort Merge Join
Date Mon, 02 Nov 2015 20:17:24 GMT
Additionally, I'm curious if there are any JIRAS around making dataframes
support ordering better? there are a lot of operations that can be
optimized if you know that you have a total ordering on your data...are
there any plans, or at least JIRAS, around having the catalyst optimizer
handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky <alex.nastetsky@vervemobile.com>:

> Thanks for the response.
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
> Yes, this is supported in
>
>    - Hive with bucket join
>    - Pig with USING "merge"
>    <https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins>
>    - MR with CompositeInputFormat
>
> But I guess it's not supported in Spark?
>
> On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao <hao.cheng@intel.com> wrote:
>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
>> requires the records with the identical join keys MUST BE shuffled to the
>> same “reducer” node / task, hashpartitioning is just a strategy to tell
>> spark shuffle service how to achieve that, in theory, we even can use the
>> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
>> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
>> about the shuffle strategy so much if it satisfies the demand on data
>> distribution.
>>
>>
>>
>> 2) If both datasets have already been previously partitioned/sorted the
>> same and stored on the file system (e.g. in a previous job), is there a way
>> to tell Spark this so that it won't want to do a "hashpartitioning" on
>> them? It looks like Spark just considers datasets that have been just read
>> from the the file system to have UnknownPartitioning. In the example below,
>> I try to join a dataframe to itself, and it still wants to hash repartition.
>>
>>
>>
>> [Hao:] Take this as example:
>>
>>
>>
>> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
>> a.key=b.key JOIN src c ON b.key=c.key
>>
>>
>>
>> == Physical Plan ==
>>
>> TungstenProject [value#20,value#22,value#24]
>>
>> SortMergeJoin [key#21], [key#23]
>>
>>   TungstenSort [key#21 ASC], false, 0
>>
>>    TungstenProject [key#21,value#22,value#20]
>>
>>     SortMergeJoin [key#19], [key#21]
>>
>>      TungstenSort [key#19 ASC], false, 0
>>
>>       TungstenExchange hashpartitioning(key#19,200)
>>
>>        ConvertToUnsafe
>>
>>         HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
>> Some(a))
>>
>>      TungstenSort [key#21 ASC], false, 0
>>
>>       TungstenExchange hashpartitioning(key#21,200)
>>
>>        ConvertToUnsafe
>>
>>         HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
>> Some(b))
>>
>>   TungstenSort [key#23 ASC], false, 0
>>
>>    TungstenExchange hashpartitioning(key#23,200)
>>
>>     ConvertToUnsafe
>>
>>      HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
>> Some(c))
>>
>>
>>
>> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
>> src b ON a.key=b.key”, as we didn’t change the data distribution after
>> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
>> which only require the table “c” for repartitioning on “key”.
>>
>>
>>
>> Taking the file system based data source as “UnknownPartitioning”, will
>> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
>> from different data sets with the identical join keys will be loaded by the
>> same node/task , since lots of factors need to be considered, like task
>> pool size, cluster size, source format, storage, data locality etc.,.
>>
>> I’ll agree it’s worth to optimize it for performance concerns, and
>> actually in Hive, it is called bucket join. I am not sure will that happens
>> soon in Spark SQL.
>>
>>
>>
>> Hao
>>
>>
>>
>> *From:* Alex Nastetsky [mailto:alex.nastetsky@vervemobile.com]
>> *Sent:* Monday, November 2, 2015 11:29 AM
>> *To:* user
>> *Subject:* Sort Merge Join
>>
>>
>>
>> Hi,
>>
>>
>>
>> I'm trying to understand SortMergeJoin (SPARK-2213).
>>
>>
>>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> CODE:
>>
>>    val sparkConf = new SparkConf()
>>
>>       .setAppName("SortMergeJoinTest")
>>
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>>       .set("spark.eventLog.enabled", "true")
>>
>>       .set("spark.sql.planner.sortMergeJoin","true")
>>
>>
>>
>>     sparkConf.setMaster("local-cluster[3,1,1024]")
>>
>>
>>
>>     val sc = new SparkContext(sparkConf)
>>
>>     val sqlContext = new SQLContext(sc)
>>
>>     import sqlContext.implicits._
>>
>>
>>
>>     val inputpath = input.gz.parquet
>>
>>
>>
>>     val df1 = sqlContext.read.parquet(inputpath).repartition(3)
>>
>>     val df2 = sqlContext.read.parquet(inputpath).repartition(5)
>>
>>     val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" ===
>> $"foo2")
>>
>>     result.explain()
>>
>>
>>
>> OUTPUT:
>>
>>     == Physical Plan ==
>>
>>     SortMergeJoin [foo#0], [foo2#8]
>>
>>     TungstenSort [foo#0 ASC], false, 0
>>
>>       TungstenExchange hashpartitioning(foo#0)
>>
>>       ConvertToUnsafe
>>
>>             Repartition 3, true
>>
>>             Scan
>> ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
>>
>>     TungstenSort [foo2#8 ASC], false, 0
>>
>>       TungstenExchange hashpartitioning(foo2#8)
>>
>>       TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
>>
>>             Repartition 5, true
>>
>>             Scan
>> ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]
>>
>>
>>
>> 2) If both datasets have already been previously partitioned/sorted the
>> same and stored on the file system (e.g. in a previous job), is there a way
>> to tell Spark this so that it won't want to do a "hashpartitioning" on
>> them? It looks like Spark just considers datasets that have been just read
>> from the the file system to have UnknownPartitioning. In the example below,
>> I try to join a dataframe to itself, and it still wants to hash repartition.
>>
>>
>>
>> CODE:
>>
>>     ...
>>
>>     val df1 = sqlContext.read.parquet(inputpath)
>>
>>     val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" ===
>> $"foo2")
>>
>>     result.explain()
>>
>>
>>
>> OUTPUT:
>>
>>     == Physical Plan ==
>>
>>     SortMergeJoin [foo#0], [foo2#4]
>>
>>     TungstenSort [foo#0 ASC], false, 0
>>
>>       TungstenExchange hashpartitioning(foo#0)
>>
>>       ConvertToUnsafe
>>
>>             Scan
>> ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
>>
>>     TungstenSort [foo2#4 ASC], false, 0
>>
>>       TungstenExchange hashpartitioning(foo2#4)
>>
>>       ConvertToUnsafe
>>
>>             Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
>>
>>             Scan
>> ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]
>>
>>
>>
>>
>>
>> Thanks.
>>
>
>

Mime
View raw message