spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Silvio Fiorito <silvio.fior...@granturing.com>
Subject Re: Joining to a large, pre-sorted file
Date Sun, 13 Nov 2016 15:24:04 GMT
Hi Stuart,

Yes that's the query plan but if you take a look at my screenshot it skips the first stage
since the datasets are co-partitioned.

Thanks,
Silvio

________________________________
From: Stuart White <stuart.white1@gmail.com>
Sent: Saturday, November 12, 2016 11:20:28 AM
To: Silvio Fiorito
Cc: user@spark.apache.org
Subject: Re: Joining to a large, pre-sorted file

Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what I'm seeing.

Remember my goal is to sort master, write it out, later read it back in and have Spark "remember"
that it's sorted, so I can do joins and Spark will not sort it again.

Looking at the explain plan for the example job you provided, it looks to me like Spark is
re-sorted master after reading it back in.  See the attachment for the Sort step I'm referring
to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <silvio.fiorito@granturing.com<mailto:silvio.fiorito@granturing.com>>
wrote:

Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



[cid:image001.png@01D23CD0.56997F50]

On 11/12/16, 8:40 AM, "Stuart White" <stuart.white1@gmail.com<mailto:stuart.white1@gmail.com>>
wrote:



    Thanks for the reply.



    I understand that I need to use bucketBy() to write my master file,

    but I still can't seem to make it work as expected.  Here's a code

    example for how I'm writing my master file:



    Range(0, 1000000)

      .map(i => (i, s"master_$i"))

      .toDF("key", "value")

      .write

      .format("json")

      .bucketBy(3, "key")

      .sortBy("key")

      .saveAsTable("master")



    And here's how I'm reading it later and attempting to join to a

    transaction dataset:



    val master = spark

      .read

      .format("json")

      .json("spark-warehouse/master")

      .cache



    val transaction = Range(0, 1000000)

      .map(i => (i, s"transaction_$i"))

      .toDF("key", "value")

      .repartition(3, 'key)

      .sortWithinPartitions('key)

      .cache



    val results = master.join(transaction, "key")



    When I call results.explain(), I see that it is sorting both datasets

    before sending them through SortMergeJoin.



    == Physical Plan ==

    *Project [key#0L, value#1, value#53]

    +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

      :- *Sort [key#0L ASC], false, 0

       :  +- Exchange hashpartitioning(key#0L, 200)

       :     +- *Filter isnotnull(key#0L)

       :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

       :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

       :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,

    InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

    PushedFilters: [], ReadSchema: struct<key:bigint,value:string>

       +- *Sort [cast(key#52 as bigint) ASC], false, 0

          +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

             +- InMemoryTableScan [key#52, value#53]

                :  +- InMemoryRelation [key#52, value#53], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

                :     :  +- *Sort [key#52 ASC], false, 0

                :     :     +- Exchange hashpartitioning(key#52, 3)

                :     :        +- LocalTableScan [key#52, value#53]



    Here are my thoughts:

    1. I think I'm probably reading the master file back into memory

    incorrectly.  I think maybe I should be reading it as a Hive table

    rather than just a plain json file, but I can't seem to figure out how

    to do that.

    2. I don't understand exactly when partition counts/bucket counts are

    important.  For example, in this example, at the time it's written,

    master has 1 partition and is written into 3 buckets, resulting in 3

    files being written out.  Later when I generated my transaction

    dataset, I repartitioned it into 3 partitions.  Was that the correct

    thing to do (3 transaction partitions == 3 master buckets)?  Or should

    I have repartitioned master into 3 partitions before writing

    (resulting in 9 files if I still create 3 buckets)?  Basically, I

    don't understand how partitions and buckets should be handled.



    So, I feel like I'm close, but there are a few ways in which I don't

    understand how these pieces are supposed to fit together.  If this is

    explained somewhere, with a simple example, that would be great.




Mime
View raw message