spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bryan Cutler <cutl...@gmail.com>
Subject Re: Thoughts on dataframe cogroup?
Date Tue, 23 Apr 2019 23:17:24 GMT
Apologies for not leaving feedback yet. I'm a little swamped this week with
the Spark Summit, but this is at the top of my list to get to for next week.

Bryan

On Thu, Apr 18, 2019 at 4:18 AM Chris Martin <chris@cmartinit.co.uk> wrote:

> Yes, totally agreed with Li here.
>
> For clarity, I'm happy to do the work to implement this, but it would be
> good to get feedback from the community in general and some of the Spark
> committers in particular.
>
> thanks,
>
> Chris
>
> On Wed, Apr 17, 2019 at 9:17 PM Li Jin <ice.xelloss@gmail.com> wrote:
>
>> I have left some comments. This looks a good proposal to me.
>>
>> As a heavy pyspark user, this is a pattern that we see over and over
>> again and I think could be pretty high value to other pyspark users as
>> well. The fact that Chris and I come to same ideas sort of verifies my
>> intuition. Also, this isn't really something new, RDD has cogroup function
>> from very early on.
>>
>> With that being said, I'd like to call out again for community's feedback
>> on the proposal.
>>
>> On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <chris@cmartinit.co.uk>
>> wrote:
>>
>>> Ah sorry- I've updated the link which should give you access.  Can you
>>> try again now?
>>>
>>> thanks,
>>>
>>> Chris
>>>
>>>
>>>
>>> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ice.xelloss@gmail.com> wrote:
>>>
>>>> Hi Chris,
>>>>
>>>> Thanks! The permission to the google doc is maybe not set up properly.
>>>> I cannot view the doc by default.
>>>>
>>>> Li
>>>>
>>>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <chris@cmartinit.co.uk>
>>>> wrote:
>>>>
>>>>> I've updated the jira so that the main body is now inside a google
>>>>> doc.  Anyone should be able to comment- if you want/need write access
>>>>> please drop me a mail and I can add you.
>>>>>
>>>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>>>> add this to the Scala API, I think the main point is that Scala users
can
>>>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>>>> solution as (as far as I know) there is no Scala DataFrame library that
>>>>> could be used in place of Pandas for manipulating  local DataFrames.
As a
>>>>> result you'd probably be left with dealing with Iterators of Row objects,
>>>>> which almost certainly isn't what you'd want. This is similar to the
>>>>> existing grouped map Pandas Udfs for which there is no equivalent Scala
Api.
>>>>>
>>>>> I do think there might be a place for allowing a (Scala) DataSet
>>>>> Cogroup to take some sort of grouping expression as the grouping key
 (this
>>>>> would mean that you wouldn't have to marshal the key into a JVM object
and
>>>>> could possible lend itself to some catalyst optimisations) but I don't
>>>>> think that this should be done as part of this SPIP.
>>>>>
>>>>> thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rblue@netflix.com> wrote:
>>>>>
>>>>>> I agree, it would be great to have a document to comment on.
>>>>>>
>>>>>> The main thing that stands out right now is that this is only for
>>>>>> PySpark and states that it will not be added to the Scala API. Why
not make
>>>>>> this available since most of the work would be done?
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xelloss@gmail.com>
wrote:
>>>>>>
>>>>>>> Thank you Chris, this looks great.
>>>>>>>
>>>>>>> Would you mind share a google doc version of the proposal? I
believe
>>>>>>> that's the preferred way of discussing proposals (Other people
please
>>>>>>> correct me if I am wrong).
>>>>>>>
>>>>>>> Li
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 8:20 AM <chris@cmartinit.co.uk>
wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>>>
>>>>>>>> All feedback welcome!
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <chris@cmartinit.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully
should
>>>>>>>> have the SPIP ready in the next couple of days.
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutlerb@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that
it wouldn't
>>>>>>>>> be too difficult to extend the currently functionality
to transfer multiple
>>>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level
and I don't
>>>>>>>>> think it's necessary to include details of the Python
worker, we can hash
>>>>>>>>> that out after the SPIP is approved.
>>>>>>>>>
>>>>>>>>> Bryan
>>>>>>>>>
>>>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xelloss@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Chris, look forward to it.
>>>>>>>>>>
>>>>>>>>>> I think sending multiple dataframes to the python
worker requires
>>>>>>>>>> some changes but shouldn't be too difficult. We can
probably sth like:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>>>>
>>>>>>>>>> In:
>>>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>>>>
>>>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>>>>
>>>>>>>>>> Li
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <chris@cmartinit.co.uk>
wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Just to say, I really do think this is useful
and am currently
>>>>>>>>>>> working on a SPIP to formally propose this. One
concern I do have, however,
>>>>>>>>>>> is that the current arrow serialization code
is tied to passing through a
>>>>>>>>>>> single dataframe as the udf parameter and so
any modification to allow
>>>>>>>>>>> multiple dataframes may not be straightforward.
 If anyone has any ideas as
>>>>>>>>>>> to how this might be achieved in an elegant manner
I’d be happy to hear
>>>>>>>>>>> them!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Chris
>>>>>>>>>>>
>>>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xelloss@gmail.com>
wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thank you both for the reply. Chris and I have
very similar use
>>>>>>>>>>> cases for cogroup.
>>>>>>>>>>>
>>>>>>>>>>> One of the goals for groupby apply + pandas UDF
was to avoid
>>>>>>>>>>> things like collect list and reshaping data between
Spark and Pandas.
>>>>>>>>>>> Cogroup feels very similar and can be an extension
to the groupby apply +
>>>>>>>>>>> pandas UDF functionality.
>>>>>>>>>>>
>>>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions
on
>>>>>>>>>>> this?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <chris@cmartinit.co.uk>
wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just to add to this I’ve also implemented
my own cogroup
>>>>>>>>>>>> previously and would welcome a cogroup for
datafame.
>>>>>>>>>>>>
>>>>>>>>>>>> My specific use case was that I had a large
amount of time
>>>>>>>>>>>> series data. Spark has very limited support
for time series (specifically
>>>>>>>>>>>> as-of joins), but pandas has good support.
>>>>>>>>>>>>
>>>>>>>>>>>> My solution was to take my two dataframes
and perform a group
>>>>>>>>>>>> by and collect list on each. The resulting
arrays could be passed into a
>>>>>>>>>>>> udf where they could be marshaled into a
couple of pandas dataframes and
>>>>>>>>>>>> processed using pandas excellent time series
functionality.
>>>>>>>>>>>>
>>>>>>>>>>>> If cogroup was available natively on dataframes
this would have
>>>>>>>>>>>> been a bit nicer. The ideal would have been
some pandas udf version of
>>>>>>>>>>>> cogroup that gave me a pandas dataframe for
each spark dataframe in the
>>>>>>>>>>>> cogroup!
>>>>>>>>>>>>
>>>>>>>>>>>> Chris
>>>>>>>>>>>>
>>>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy
<
>>>>>>>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> For info, in our team have defined our own
cogroup on dataframe
>>>>>>>>>>>> in the past on different projects using different
methods (rdd[row] based
>>>>>>>>>>>> or union all collect list based).
>>>>>>>>>>>>
>>>>>>>>>>>> I might be biased, but find the approach
very useful in project
>>>>>>>>>>>> to simplify and speed up transformations,
and remove a lot of intermediate
>>>>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>>>>
>>>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator
for nested
>>>>>>>>>>>> data. That's a win!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xelloss@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I am wondering do other people have opinion/use
case on
>>>>>>>>>>>>> cogroup?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin
<ice.xelloss@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Alessandro,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the reply. I assume by
"equi-join", you mean
>>>>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Two issues I see with equity outer
join is:
>>>>>>>>>>>>>> (1) equity outer join will give n
* m rows for each key (n
>>>>>>>>>>>>>> and m being the corresponding number
of rows in df1 and df2 for each key)
>>>>>>>>>>>>>> (2) User needs to do some extra processing
to transform n * m
>>>>>>>>>>>>>> back to the desired shape (two sub
dataframes with n and m rows)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think full outer join is an inefficient
way to implement
>>>>>>>>>>>>>> cogroup. If the end goal is to have
two separate dataframes for each key,
>>>>>>>>>>>>>> why joining them first and then unjoin
them?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro
Solimando <
>>>>>>>>>>>>>> alessandro.solimando@gmail.com>
wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>> I fail to see how an equi-join
on the key columns is
>>>>>>>>>>>>>>> different than the cogroup you
propose.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think the accepted answer can
shed some light:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now you apply an udf on each
iterable, one per key value
>>>>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>>>>> 1) join df1 and df2 on the key
you want,
>>>>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>>>>> 3) finally apply a udaf (you
can have a look here if you are
>>>>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>>>>> that will process each group
"in isolation".
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> HTH,
>>>>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30,
Li Jin <ice.xelloss@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We have been using Pyspark's
groupby().apply() quite a bit
>>>>>>>>>>>>>>>> and it has been very helpful
in integrating Spark with our existing
>>>>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Recently, we have found more
and more cases where
>>>>>>>>>>>>>>>> groupby().apply() is not
sufficient - In some cases, we want to group two
>>>>>>>>>>>>>>>> dataframes by the same key,
and apply a function which takes two
>>>>>>>>>>>>>>>> pd.DataFrame (also returns
a pd.DataFrame) for each key. This feels very
>>>>>>>>>>>>>>>> much like the "cogroup" operation
in the RDD API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It would be great to be able
to do sth like this: (not
>>>>>>>>>>>>>>>> actual API, just to explain
the use case):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @pandas_udf(return_schema,
...)
>>>>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>>>>      # pdf1 and pdf2 are
the subset of the original
>>>>>>>>>>>>>>>> dataframes that is associated
with a particular key
>>>>>>>>>>>>>>>>      result = ... # some
code that uses pdf1 and pdf2
>>>>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> df3  = cogroup(df1, df2,
key='some_key').apply(my_udf)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have searched around the
problem and some people have
>>>>>>>>>>>>>>>> suggested to join the tables
first. However, it's often not the same
>>>>>>>>>>>>>>>> pattern and hard to get it
to work by using joins.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I wonder what are people's
thought on this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Li
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>

Mime
View raw message