spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY
Date Fri, 04 Nov 2016 14:06:00 GMT
i just noticed Sort for Dataset has a global flag. and Dataset also has
sortWithinPartitions.

how about:
repartition + sortWithinPartitions + mapPartitions?

the plan looks ok, but it is not clear to me if the sort is done as part of
the shuffle (which is the important optimization).

scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
"value")

scala> df.repartition(2,
col("key")).sortWithinPartitions("value").as[(String,
String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
== Physical Plan ==
*SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
object)._1, true) AS _1#39, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
object)._2, true) AS _2#40]
+- MapPartitions <function1>, obj#38: scala.Tuple2
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
scala.Tuple2
      +- *Sort [value#6 ASC], false, 0
         +- Exchange hashpartitioning(key#5, 2)
            +- LocalTableScan [key#5, value#6]




On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers <koert@tresata.com> wrote:

> sure, but then my values are not sorted per key, right?
>
> so a group by key with values sorted according to to some ordering is an
> operation that can be done efficiently in a single shuffle without first
> figuring out range boundaries. and it is needed for quite a few algos,
> including Window and lots of timeseries stuff. but it seems there is no way
> to express i want to do this yet (at least not in an efficient way).
>
> which makes me wonder, what does Window do?
>
>
> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust <michael@databricks.com>
> wrote:
>
>> Thinking out loud is good :)
>>
>> You are right in that anytime you ask for a global ordering from Spark
>> you will pay the cost of figuring out the range boundaries for partitions.
>> If you say orderBy, though, we aren't sure that you aren't expecting a
>> global order.
>>
>> If you only want to make sure that items are colocated, it is cheaper to
>> do a groupByKey followed by a flatMapGroups
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>
>> .
>>
>>
>>
>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers <koert@tresata.com> wrote:
>>
>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>>> then do mapPartitions?
>>>
>>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>>
>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers <koert@tresata.com>
>>> wrote:
>>>
>>>> thats an interesting thought about orderBy and mapPartitions. i guess i
>>>> could emulate a groupBy with secondary sort using those two. however isn't
>>>> using an orderBy expensive since it is a total sort? i mean a groupBy with
>>>> secondary sort is also a total sort under the hood, but its on
>>>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>>>> therefore can be implemented more efficiently.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>>> It is still unclear to me why we should remember all these tricks (or
>>>>>> add lots of extra little functions) when this elegantly can be expressed
in
>>>>>> a reduce operation with a simple one line lamba function.
>>>>>>
>>>>> I think you can do that too.  KeyValueGroupedDataset has a
>>>>> reduceGroups function.  This probably won't be as fast though because
you
>>>>> end up creating objects where as the version I gave will get codgened
to
>>>>> operate on binary data the whole way though.
>>>>>
>>>>>> The same applies to these Window functions. I had to read it 3 times
>>>>>> to understand what it all means. Maybe it makes sense for someone
who has
>>>>>> been forced to use such limited tools in sql for many years but that's
not
>>>>>> necessary what we should aim for. Why can I not just have the sortBy
and
>>>>>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>>>>>
>>>>> We also have orderBy and mapPartitions.
>>>>>
>>>>>> All these functions (rank etc.) can be trivially expressed in this,
>>>>>> plus I can add other operations if needed, instead of being locked
in like
>>>>>> this Window framework.
>>>>>>
>>>>>  I agree that window functions would probably not be my first choice
>>>>> for many problems, but for people coming from SQL it was a very popular
>>>>> feature.  My real goal is to give as many paradigms as possible in a
single
>>>>> unified framework.  Let people pick the right mode of expression for
any
>>>>> given job :)
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message