spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY
Date Fri, 04 Nov 2016 19:57:26 GMT
okay i see the partition local sort. got it.

i would expect that pushing the partition local sort into shuffle would
give a signficicant boost. but thats just a guess.

On Fri, Nov 4, 2016 at 2:39 PM, Michael Armbrust <michael@databricks.com>
wrote:

> sure, but then my values are not sorted per key, right?
>
>
> It does do a partition local sort. Look at the query plan in my example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>.
> The code here will also take care of finding the boundaries and is pretty
> careful to spill / avoid materializing unnecessarily.
>
> I think you are correct though that we are not pushing any of the sort
> into the shuffle.  I'm not sure how much that buys you.  If its a lot we
> could extend the planner to look for Exchange->Sort pairs and change the
> exchange.
>
> On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers <koert@tresata.com> wrote:
>
>> i just noticed Sort for Dataset has a global flag. and Dataset also has
>> sortWithinPartitions.
>>
>> how about:
>> repartition + sortWithinPartitions + mapPartitions?
>>
>> the plan looks ok, but it is not clear to me if the sort is done as part
>> of the shuffle (which is the important optimization).
>>
>> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
>> "value")
>>
>> scala> df.repartition(2, col("key")).sortWithinPartitions("value").as[(String,
>> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
>> == Physical Plan ==
>> *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
>> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
>> object)._2, true) AS _2#40]
>> +- MapPartitions <function1>, obj#38: scala.Tuple2
>>    +- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
>> scala.Tuple2
>>       +- *Sort [value#6 ASC], false, 0
>>          +- Exchange hashpartitioning(key#5, 2)
>>             +- LocalTableScan [key#5, value#6]
>>
>>
>>
>>
>> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers <koert@tresata.com> wrote:
>>
>>> sure, but then my values are not sorted per key, right?
>>>
>>> so a group by key with values sorted according to to some ordering is an
>>> operation that can be done efficiently in a single shuffle without first
>>> figuring out range boundaries. and it is needed for quite a few algos,
>>> including Window and lots of timeseries stuff. but it seems there is no way
>>> to express i want to do this yet (at least not in an efficient way).
>>>
>>> which makes me wonder, what does Window do?
>>>
>>>
>>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust <
>>> michael@databricks.com> wrote:
>>>
>>>> Thinking out loud is good :)
>>>>
>>>> You are right in that anytime you ask for a global ordering from Spark
>>>> you will pay the cost of figuring out the range boundaries for partitions.
>>>> If you say orderBy, though, we aren't sure that you aren't expecting a
>>>> global order.
>>>>
>>>> If you only want to make sure that items are colocated, it is cheaper
>>>> to do a groupByKey followed by a flatMapGroups
>>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>
>>>> .
>>>>
>>>>
>>>>
>>>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers <koert@tresata.com>
>>>> wrote:
>>>>
>>>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>>>>> then do mapPartitions?
>>>>>
>>>>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>>>>
>>>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers <koert@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> thats an interesting thought about orderBy and mapPartitions. i guess
>>>>>> i could emulate a groupBy with secondary sort using those two. however
>>>>>> isn't using an orderBy expensive since it is a total sort? i mean
a groupBy
>>>>>> with secondary sort is also a total sort under the hood, but its
on
>>>>>> (hashCode(key), secondarySortColumn) which is easier to distribute
and
>>>>>> therefore can be implemented more efficiently.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
>>>>>> michael@databricks.com> wrote:
>>>>>>
>>>>>>> It is still unclear to me why we should remember all these tricks
>>>>>>>> (or add lots of extra little functions) when this elegantly
can be
>>>>>>>> expressed in a reduce operation with a simple one line lamba
function.
>>>>>>>>
>>>>>>> I think you can do that too.  KeyValueGroupedDataset has a
>>>>>>> reduceGroups function.  This probably won't be as fast though
because you
>>>>>>> end up creating objects where as the version I gave will get
codgened to
>>>>>>> operate on binary data the whole way though.
>>>>>>>
>>>>>>>> The same applies to these Window functions. I had to read
it 3
>>>>>>>> times to understand what it all means. Maybe it makes sense
for someone who
>>>>>>>> has been forced to use such limited tools in sql for many
years but that's
>>>>>>>> not necessary what we should aim for. Why can I not just
have the sortBy
>>>>>>>> and then an Iterator[X] => Iterator[Y] to express what
I want to do?
>>>>>>>>
>>>>>>> We also have orderBy and mapPartitions.
>>>>>>>
>>>>>>>> All these functions (rank etc.) can be trivially expressed
in this,
>>>>>>>> plus I can add other operations if needed, instead of being
locked in like
>>>>>>>> this Window framework.
>>>>>>>>
>>>>>>>  I agree that window functions would probably not be my first
choice
>>>>>>> for many problems, but for people coming from SQL it was a very
popular
>>>>>>> feature.  My real goal is to give as many paradigms as possible
in a single
>>>>>>> unified framework.  Let people pick the right mode of expression
for any
>>>>>>> given job :)
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message