spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Martin <ch...@cmartinit.co.uk>
Subject Re: Thoughts on dataframe cogroup?
Date Thu, 18 Apr 2019 11:18:26 GMT
Yes, totally agreed with Li here.

For clarity, I'm happy to do the work to implement this, but it would be
good to get feedback from the community in general and some of the Spark
committers in particular.

thanks,

Chris

On Wed, Apr 17, 2019 at 9:17 PM Li Jin <ice.xelloss@gmail.com> wrote:

> I have left some comments. This looks a good proposal to me.
>
> As a heavy pyspark user, this is a pattern that we see over and over again
> and I think could be pretty high value to other pyspark users as well. The
> fact that Chris and I come to same ideas sort of verifies my intuition.
> Also, this isn't really something new, RDD has cogroup function from very
> early on.
>
> With that being said, I'd like to call out again for community's feedback
> on the proposal.
>
> On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <chris@cmartinit.co.uk>
> wrote:
>
>> Ah sorry- I've updated the link which should give you access.  Can you
>> try again now?
>>
>> thanks,
>>
>> Chris
>>
>>
>>
>> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ice.xelloss@gmail.com> wrote:
>>
>>> Hi Chris,
>>>
>>> Thanks! The permission to the google doc is maybe not set up properly. I
>>> cannot view the doc by default.
>>>
>>> Li
>>>
>>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <chris@cmartinit.co.uk>
>>> wrote:
>>>
>>>> I've updated the jira so that the main body is now inside a google
>>>> doc.  Anyone should be able to comment- if you want/need write access
>>>> please drop me a mail and I can add you.
>>>>
>>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>>> add this to the Scala API, I think the main point is that Scala users can
>>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>>> solution as (as far as I know) there is no Scala DataFrame library that
>>>> could be used in place of Pandas for manipulating  local DataFrames. As a
>>>> result you'd probably be left with dealing with Iterators of Row objects,
>>>> which almost certainly isn't what you'd want. This is similar to the
>>>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>>>
>>>> I do think there might be a place for allowing a (Scala) DataSet
>>>> Cogroup to take some sort of grouping expression as the grouping key  (this
>>>> would mean that you wouldn't have to marshal the key into a JVM object and
>>>> could possible lend itself to some catalyst optimisations) but I don't
>>>> think that this should be done as part of this SPIP.
>>>>
>>>> thanks,
>>>>
>>>> Chris
>>>>
>>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rblue@netflix.com> wrote:
>>>>
>>>>> I agree, it would be great to have a document to comment on.
>>>>>
>>>>> The main thing that stands out right now is that this is only for
>>>>> PySpark and states that it will not be added to the Scala API. Why not
make
>>>>> this available since most of the work would be done?
>>>>>
>>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xelloss@gmail.com>
wrote:
>>>>>
>>>>>> Thank you Chris, this looks great.
>>>>>>
>>>>>> Would you mind share a google doc version of the proposal? I believe
>>>>>> that's the preferred way of discussing proposals (Other people please
>>>>>> correct me if I am wrong).
>>>>>>
>>>>>> Li
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 8:20 AM <chris@cmartinit.co.uk> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>>
>>>>>>> All feedback welcome!
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <chris@cmartinit.co.uk>
wrote:
>>>>>>>
>>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should
>>>>>>> have the SPIP ready in the next couple of days.
>>>>>>>
>>>>>>> thanks,
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutlerb@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it
wouldn't
>>>>>>>> be too difficult to extend the currently functionality to
transfer multiple
>>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level
and I don't
>>>>>>>> think it's necessary to include details of the Python worker,
we can hash
>>>>>>>> that out after the SPIP is approved.
>>>>>>>>
>>>>>>>> Bryan
>>>>>>>>
>>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xelloss@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Chris, look forward to it.
>>>>>>>>>
>>>>>>>>> I think sending multiple dataframes to the python worker
requires
>>>>>>>>> some changes but shouldn't be too difficult. We can probably
sth like:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>>>
>>>>>>>>> In:
>>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>>>
>>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>>>
>>>>>>>>> Li
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <chris@cmartinit.co.uk>
wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Just to say, I really do think this is useful and
am currently
>>>>>>>>>> working on a SPIP to formally propose this. One concern
I do have, however,
>>>>>>>>>> is that the current arrow serialization code is tied
to passing through a
>>>>>>>>>> single dataframe as the udf parameter and so any
modification to allow
>>>>>>>>>> multiple dataframes may not be straightforward. 
If anyone has any ideas as
>>>>>>>>>> to how this might be achieved in an elegant manner
I’d be happy to hear
>>>>>>>>>> them!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Chris
>>>>>>>>>>
>>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xelloss@gmail.com>
wrote:
>>>>>>>>>>
>>>>>>>>>> Thank you both for the reply. Chris and I have very
similar use
>>>>>>>>>> cases for cogroup.
>>>>>>>>>>
>>>>>>>>>> One of the goals for groupby apply + pandas UDF was
to avoid
>>>>>>>>>> things like collect list and reshaping data between
Spark and Pandas.
>>>>>>>>>> Cogroup feels very similar and can be an extension
to the groupby apply +
>>>>>>>>>> pandas UDF functionality.
>>>>>>>>>>
>>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions
on this?
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <chris@cmartinit.co.uk>
wrote:
>>>>>>>>>>
>>>>>>>>>>> Just to add to this I’ve also implemented my
own cogroup
>>>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>>>
>>>>>>>>>>> My specific use case was that I had a large amount
of time
>>>>>>>>>>> series data. Spark has very limited support for
time series (specifically
>>>>>>>>>>> as-of joins), but pandas has good support.
>>>>>>>>>>>
>>>>>>>>>>> My solution was to take my two dataframes and
perform a group by
>>>>>>>>>>> and collect list on each. The resulting arrays
could be passed into a udf
>>>>>>>>>>> where they could be marshaled into a couple of
pandas dataframes and
>>>>>>>>>>> processed using pandas excellent time series
functionality.
>>>>>>>>>>>
>>>>>>>>>>> If cogroup was available natively on dataframes
this would have
>>>>>>>>>>> been a bit nicer. The ideal would have been some
pandas udf version of
>>>>>>>>>>> cogroup that gave me a pandas dataframe for each
spark dataframe in the
>>>>>>>>>>> cogroup!
>>>>>>>>>>>
>>>>>>>>>>> Chris
>>>>>>>>>>>
>>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> For info, in our team have defined our own cogroup
on dataframe
>>>>>>>>>>> in the past on different projects using different
methods (rdd[row] based
>>>>>>>>>>> or union all collect list based).
>>>>>>>>>>>
>>>>>>>>>>> I might be biased, but find the approach very
useful in project
>>>>>>>>>>> to simplify and speed up transformations, and
remove a lot of intermediate
>>>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>>>
>>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator
for nested data.
>>>>>>>>>>> That's a win!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xelloss@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I am wondering do other people have opinion/use
case on cogroup?
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xelloss@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Alessandro,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the reply. I assume by "equi-join",
you mean
>>>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>>>
>>>>>>>>>>>>> Two issues I see with equity outer join
is:
>>>>>>>>>>>>> (1) equity outer join will give n * m
rows for each key (n and
>>>>>>>>>>>>> m being the corresponding number of rows
in df1 and df2 for each key)
>>>>>>>>>>>>> (2) User needs to do some extra processing
to transform n * m
>>>>>>>>>>>>> back to the desired shape (two sub dataframes
with n and m rows)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think full outer join is an inefficient
way to implement
>>>>>>>>>>>>> cogroup. If the end goal is to have two
separate dataframes for each key,
>>>>>>>>>>>>> why joining them first and then unjoin
them?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro
Solimando <
>>>>>>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>> I fail to see how an equi-join on
the key columns is
>>>>>>>>>>>>>> different than the cogroup you propose.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think the accepted answer can shed
some light:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now you apply an udf on each iterable,
one per key value
>>>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>>>> 1) join df1 and df2 on the key you
want,
>>>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>>>> 3) finally apply a udaf (you can
have a look here if you are
>>>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>>>> that will process each group "in
isolation".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> HTH,
>>>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li
Jin <ice.xelloss@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We have been using Pyspark's
groupby().apply() quite a bit
>>>>>>>>>>>>>>> and it has been very helpful
in integrating Spark with our existing
>>>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Recently, we have found more
and more cases where
>>>>>>>>>>>>>>> groupby().apply() is not sufficient
- In some cases, we want to group two
>>>>>>>>>>>>>>> dataframes by the same key, and
apply a function which takes two
>>>>>>>>>>>>>>> pd.DataFrame (also returns a
pd.DataFrame) for each key. This feels very
>>>>>>>>>>>>>>> much like the "cogroup" operation
in the RDD API.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It would be great to be able
to do sth like this: (not
>>>>>>>>>>>>>>> actual API, just to explain the
use case):
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>>>      # pdf1 and pdf2 are the
subset of the original
>>>>>>>>>>>>>>> dataframes that is associated
with a particular key
>>>>>>>>>>>>>>>      result = ... # some code
that uses pdf1 and pdf2
>>>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have searched around the problem
and some people have
>>>>>>>>>>>>>>> suggested to join the tables
first. However, it's often not the same
>>>>>>>>>>>>>>> pattern and hard to get it to
work by using joins.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I wonder what are people's thought
on this?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Li
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>

Mime
View raw message