spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stuart White <stuart.whi...@gmail.com>
Subject Re: Joining to a large, pre-sorted file
Date Sat, 12 Nov 2016 16:20:28 GMT
Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what
I'm seeing.

Remember my goal is to sort master, write it out, later read it back in and
have Spark "remember" that it's sorted, so I can do joins and Spark will
not sort it again.

Looking at the explain plan for the example job you provided, it looks to
me like Spark is re-sorted master after reading it back in.  See the
attachment for the Sort step I'm referring to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:

> Hi Stuart,
>
>
>
> You don’t need the sortBy or sortWithinPartitions.
>
>
>
> https://databricks-prod-cloudfront.cloud.databricks.com/public/
> 4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/
> 6861830365114179/latest.html
>
>
>
>
>
> This is what the job should look like:
>
>
>
> On 11/12/16, 8:40 AM, "Stuart White" <stuart.white1@gmail.com> wrote:
>
>
>
>     Thanks for the reply.
>
>
>
>     I understand that I need to use bucketBy() to write my master file,
>
>     but I still can't seem to make it work as expected.  Here's a code
>
>     example for how I'm writing my master file:
>
>
>
>     Range(0, 1000000)
>
>       .map(i => (i, s"master_$i"))
>
>       .toDF("key", "value")
>
>       .write
>
>       .format("json")
>
>       .bucketBy(3, "key")
>
>       .sortBy("key")
>
>       .saveAsTable("master")
>
>
>
>     And here's how I'm reading it later and attempting to join to a
>
>     transaction dataset:
>
>
>
>     val master = spark
>
>       .read
>
>       .format("json")
>
>       .json("spark-warehouse/master")
>
>       .cache
>
>
>
>     val transaction = Range(0, 1000000)
>
>       .map(i => (i, s"transaction_$i"))
>
>       .toDF("key", "value")
>
>       .repartition(3, 'key)
>
>       .sortWithinPartitions('key)
>
>       .cache
>
>
>
>     val results = master.join(transaction, "key")
>
>
>
>     When I call results.explain(), I see that it is sorting both datasets
>
>     before sending them through SortMergeJoin.
>
>
>
>     == Physical Plan ==
>
>     *Project [key#0L, value#1, value#53]
>
>     +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
>
>       :- *Sort [key#0L ASC], false, 0
>
>        :  +- Exchange hashpartitioning(key#0L, 200)
>
>        :     +- *Filter isnotnull(key#0L)
>
>        :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]
>
>        :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,
>
>     StorageLevel(disk, memory, deserialized, 1 replicas)
>
>        :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,
>
>     InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
>
>     PushedFilters: [], ReadSchema: struct<key:bigint,value:string>
>
>        +- *Sort [cast(key#52 as bigint) ASC], false, 0
>
>           +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
>
>              +- InMemoryTableScan [key#52, value#53]
>
>                 :  +- InMemoryRelation [key#52, value#53], true, 10000,
>
>     StorageLevel(disk, memory, deserialized, 1 replicas)
>
>                 :     :  +- *Sort [key#52 ASC], false, 0
>
>                 :     :     +- Exchange hashpartitioning(key#52, 3)
>
>                 :     :        +- LocalTableScan [key#52, value#53]
>
>
>
>     Here are my thoughts:
>
>     1. I think I'm probably reading the master file back into memory
>
>     incorrectly.  I think maybe I should be reading it as a Hive table
>
>     rather than just a plain json file, but I can't seem to figure out how
>
>     to do that.
>
>     2. I don't understand exactly when partition counts/bucket counts are
>
>     important.  For example, in this example, at the time it's written,
>
>     master has 1 partition and is written into 3 buckets, resulting in 3
>
>     files being written out.  Later when I generated my transaction
>
>     dataset, I repartitioned it into 3 partitions.  Was that the correct
>
>     thing to do (3 transaction partitions == 3 master buckets)?  Or should
>
>     I have repartitioned master into 3 partitions before writing
>
>     (resulting in 9 files if I still create 3 buckets)?  Basically, I
>
>     don't understand how partitions and buckets should be handled.
>
>
>
>     So, I feel like I'm close, but there are a few ways in which I don't
>
>     understand how these pieces are supposed to fit together.  If this is
>
>     explained somewhere, with a simple example, that would be great.
>
>
>

Mime
View raw message