spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <rb...@netflix.com.INVALID>
Subject Re: Thoughts on dataframe cogroup?
Date Mon, 15 Apr 2019 17:26:42 GMT
I agree, it would be great to have a document to comment on.

The main thing that stands out right now is that this is only for PySpark
and states that it will not be added to the Scala API. Why not make this
available since most of the work would be done?

On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xelloss@gmail.com> wrote:

> Thank you Chris, this looks great.
>
> Would you mind share a google doc version of the proposal? I believe
> that's the preferred way of discussing proposals (Other people please
> correct me if I am wrong).
>
> Li
>
> On Mon, Apr 15, 2019 at 8:20 AM <chris@cmartinit.co.uk> wrote:
>
>> Hi,
>>
>>  As promised I’ve raised SPARK-27463 for this.
>>
>> All feedback welcome!
>>
>> Chris
>>
>> On 9 Apr 2019, at 13:22, Chris Martin <chris@cmartinit.co.uk> wrote:
>>
>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
>> SPIP ready in the next couple of days.
>>
>> thanks,
>>
>> Chris
>>
>>
>>
>>
>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutlerb@gmail.com> wrote:
>>
>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>> too difficult to extend the currently functionality to transfer multiple
>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>> think it's necessary to include details of the Python worker, we can hash
>>> that out after the SPIP is approved.
>>>
>>> Bryan
>>>
>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xelloss@gmail.com> wrote:
>>>
>>>> Thanks Chris, look forward to it.
>>>>
>>>> I think sending multiple dataframes to the python worker requires some
>>>> changes but shouldn't be too difficult. We can probably sth like:
>>>>
>>>>
>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>
>>>> In:
>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>
>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>
>>>> Li
>>>>
>>>>
>>>> On Mon, Apr 8, 2019 at 5:55 AM <chris@cmartinit.co.uk> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Just to say, I really do think this is useful and am currently working
>>>>> on a SPIP to formally propose this. One concern I do have, however, is
that
>>>>> the current arrow serialization code is tied to passing through a single
>>>>> dataframe as the udf parameter and so any modification to allow multiple
>>>>> dataframes may not be straightforward.  If anyone has any ideas as to
how
>>>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xelloss@gmail.com> wrote:
>>>>>
>>>>> Thank you both for the reply. Chris and I have very similar use cases
>>>>> for cogroup.
>>>>>
>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>>> UDF functionality.
>>>>>
>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>
>>>>> On Tue, Feb 26, 2019 at 2:17 AM <chris@cmartinit.co.uk> wrote:
>>>>>
>>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>>> and would welcome a cogroup for datafame.
>>>>>>
>>>>>> My specific use case was that I had a large amount of time series
>>>>>> data. Spark has very limited support for time series (specifically
as-of
>>>>>> joins), but pandas has good support.
>>>>>>
>>>>>> My solution was to take my two dataframes and perform a group by
and
>>>>>> collect list on each. The resulting arrays could be passed into a
udf where
>>>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>>>> using pandas excellent time series functionality.
>>>>>>
>>>>>> If cogroup was available natively on dataframes this would have been
>>>>>> a bit nicer. The ideal would have been some pandas udf version of
cogroup
>>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>> jonathan.winandy@gmail.com> wrote:
>>>>>>
>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>> the past on different projects using different methods (rdd[row]
based or
>>>>>> union all collect list based).
>>>>>>
>>>>>> I might be biased, but find the approach very useful in project to
>>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>>> stages (distinct + join => just cogroup).
>>>>>>
>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>> That's a win!
>>>>>>
>>>>>>
>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xelloss@gmail.com>
wrote:
>>>>>>
>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>
>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xelloss@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Alessandro,
>>>>>>>>
>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>>>> full outer join" .
>>>>>>>>
>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>> (1) equity outer join will give n * m rows for each key (n
and m
>>>>>>>> being the corresponding number of rows in df1 and df2 for
each key)
>>>>>>>> (2) User needs to do some extra processing to transform n
* m back
>>>>>>>> to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>
>>>>>>>> I think full outer join is an inefficient way to implement
cogroup.
>>>>>>>> If the end goal is to have two separate dataframes for each
key, why
>>>>>>>> joining them first and then unjoin them?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> I fail to see how an equi-join on the key columns is
different
>>>>>>>>> than the cogroup you propose.
>>>>>>>>>
>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>
>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>
>>>>>>>>> Now you apply an udf on each iterable, one per key value
(obtained
>>>>>>>>> with cogroup).
>>>>>>>>>
>>>>>>>>> You can achieve the same by:
>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>> 3) finally apply a udaf (you can have a look here if
you are not
>>>>>>>>> familiar with them
>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>
>>>>>>>>> HTH,
>>>>>>>>> Alessandro
>>>>>>>>>
>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xelloss@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> We have been using Pyspark's groupby().apply() quite
a bit and it
>>>>>>>>>> has been very helpful in integrating Spark with our
existing pandas-heavy
>>>>>>>>>> libraries.
>>>>>>>>>>
>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>> groupby().apply() is not sufficient - In some cases,
we want to group two
>>>>>>>>>> dataframes by the same key, and apply a function
which takes two
>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each
key. This feels very
>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>
>>>>>>>>>> It would be great to be able to do sth like this:
(not actual
>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>
>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original
dataframes
>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>      result = ... # some code that uses pdf1 and
pdf2
>>>>>>>>>>      return result
>>>>>>>>>>
>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>
>>>>>>>>>> I have searched around the problem and some people
have suggested
>>>>>>>>>> to join the tables first. However, it's often not
the same pattern and hard
>>>>>>>>>> to get it to work by using joins.
>>>>>>>>>>
>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>
>>>>>>>>>> Li
>>>>>>>>>>
>>>>>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix

Mime
View raw message