spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Nastetsky <alex.nastet...@vervemobile.com>
Subject Sort Merge Join
Date Mon, 02 Nov 2015 03:29:26 GMT
Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
example, in the code below, the two datasets have different number of
partitions, but it still does a SortMerge join after a "hashpartitioning".


CODE:
   val sparkConf = new SparkConf()
      .setAppName("SortMergeJoinTest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.eventLog.enabled", "true")
      .set("spark.sql.planner.sortMergeJoin","true")

    sparkConf.setMaster("local-cluster[3,1,1024]")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val inputpath = input.gz.parquet

    val df1 = sqlContext.read.parquet(inputpath).repartition(3)
    val df2 = sqlContext.read.parquet(inputpath).repartition(5)
    val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
    result.explain()

OUTPUT:
    == Physical Plan ==
    SortMergeJoin [foo#0], [foo2#8]
    TungstenSort [foo#0 ASC], false, 0
      TungstenExchange hashpartitioning(foo#0)
      ConvertToUnsafe
Repartition 3, true
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
    TungstenSort [foo2#8 ASC], false, 0
      TungstenExchange hashpartitioning(foo2#8)
      TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the
same and stored on the file system (e.g. in a previous job), is there a way
to tell Spark this so that it won't want to do a "hashpartitioning" on
them? It looks like Spark just considers datasets that have been just read
from the the file system to have UnknownPartitioning. In the example below,
I try to join a dataframe to itself, and it still wants to hash repartition.

CODE:
    ...
    val df1 = sqlContext.read.parquet(inputpath)
    val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
    result.explain()

OUTPUT:
    == Physical Plan ==
    SortMergeJoin [foo#0], [foo2#4]
    TungstenSort [foo#0 ASC], false, 0
      TungstenExchange hashpartitioning(foo#0)
      ConvertToUnsafe
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
    TungstenSort [foo2#4 ASC], false, 0
      TungstenExchange hashpartitioning(foo2#4)
      ConvertToUnsafe
Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
Scan
ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]


Thanks.

Mime
View raw message