spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Martin <ch...@cmartinit.co.uk>
Subject Re: Thoughts on dataframe cogroup?
Date Tue, 09 Apr 2019 12:22:59 GMT
Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
SPIP ready in the next couple of days.

thanks,

Chris




On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutlerb@gmail.com> wrote:

> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too
> difficult to extend the currently functionality to transfer multiple
> DataFrames.  For the SPIP, I would keep it more high-level and I don't
> think it's necessary to include details of the Python worker, we can hash
> that out after the SPIP is approved.
>
> Bryan
>
> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xelloss@gmail.com> wrote:
>
>> Thanks Chris, look forward to it.
>>
>> I think sending multiple dataframes to the python worker requires some
>> changes but shouldn't be too difficult. We can probably sth like:
>>
>>
>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>
>> In:
>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>
>> And have ArrowPythonRunner take multiple input iterator/schema.
>>
>> Li
>>
>>
>> On Mon, Apr 8, 2019 at 5:55 AM <chris@cmartinit.co.uk> wrote:
>>
>>> Hi,
>>>
>>> Just to say, I really do think this is useful and am currently working
>>> on a SPIP to formally propose this. One concern I do have, however, is that
>>> the current arrow serialization code is tied to passing through a single
>>> dataframe as the udf parameter and so any modification to allow multiple
>>> dataframes may not be straightforward.  If anyone has any ideas as to how
>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>
>>> Thanks,
>>>
>>> Chris
>>>
>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xelloss@gmail.com> wrote:
>>>
>>> Thank you both for the reply. Chris and I have very similar use cases
>>> for cogroup.
>>>
>>> One of the goals for groupby apply + pandas UDF was to avoid things like
>>> collect list and reshaping data between Spark and Pandas. Cogroup feels
>>> very similar and can be an extension to the groupby apply + pandas UDF
>>> functionality.
>>>
>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>
>>> On Tue, Feb 26, 2019 at 2:17 AM <chris@cmartinit.co.uk> wrote:
>>>
>>>> Just to add to this I’ve also implemented my own cogroup previously and
>>>> would welcome a cogroup for datafame.
>>>>
>>>> My specific use case was that I had a large amount of time series data.
>>>> Spark has very limited support for time series (specifically as-of joins),
>>>> but pandas has good support.
>>>>
>>>> My solution was to take my two dataframes and perform a group by and
>>>> collect list on each. The resulting arrays could be passed into a udf where
>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>> using pandas excellent time series functionality.
>>>>
>>>> If cogroup was available natively on dataframes this would have been a
>>>> bit nicer. The ideal would have been some pandas udf version of cogroup
>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>
>>>> Chris
>>>>
>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.winandy@gmail.com>
>>>> wrote:
>>>>
>>>> For info, in our team have defined our own cogroup on dataframe in the
>>>> past on different projects using different methods (rdd[row] based or union
>>>> all collect list based).
>>>>
>>>> I might be biased, but find the approach very useful in project to
>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>> stages (distinct + join => just cogroup).
>>>>
>>>> Plus spark 2.4 introduced a lot of new operator for nested data. That's
>>>> a win!
>>>>
>>>>
>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xelloss@gmail.com> wrote:
>>>>
>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>
>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xelloss@gmail.com>
wrote:
>>>>>
>>>>>> Alessandro,
>>>>>>
>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>> full outer join" .
>>>>>>
>>>>>> Two issues I see with equity outer join is:
>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>> (2) User needs to do some extra processing to transform n * m back
to
>>>>>> the desired shape (two sub dataframes with n and m rows)
>>>>>>
>>>>>> I think full outer join is an inefficient way to implement cogroup.
>>>>>> If the end goal is to have two separate dataframes for each key,
why
>>>>>> joining them first and then unjoin them?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>> alessandro.solimando@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> I fail to see how an equi-join on the key columns is different
than
>>>>>>> the cogroup you propose.
>>>>>>>
>>>>>>> I think the accepted answer can shed some light:
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>
>>>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>>>> with cogroup).
>>>>>>>
>>>>>>> You can achieve the same by:
>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>> 2) apply "groupby" on such key
>>>>>>> 3) finally apply a udaf (you can have a look here if you are
not
>>>>>>> familiar with them
>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>> that will process each group "in isolation".
>>>>>>>
>>>>>>> HTH,
>>>>>>> Alessandro
>>>>>>>
>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xelloss@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit
and it
>>>>>>>> has been very helpful in integrating Spark with our existing
pandas-heavy
>>>>>>>> libraries.
>>>>>>>>
>>>>>>>> Recently, we have found more and more cases where groupby().apply()
>>>>>>>> is not sufficient - In some cases, we want to group two dataframes
by the
>>>>>>>> same key, and apply a function which takes two pd.DataFrame
(also returns a
>>>>>>>> pd.DataFrame) for each key. This feels very much like the
"cogroup"
>>>>>>>> operation in the RDD API.
>>>>>>>>
>>>>>>>> It would be great to be able to do sth like this: (not actual
API,
>>>>>>>> just to explain the use case):
>>>>>>>>
>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
that
>>>>>>>> is associated with a particular key
>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>      return result
>>>>>>>>
>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>
>>>>>>>> I have searched around the problem and some people have suggested
>>>>>>>> to join the tables first. However, it's often not the same
pattern and hard
>>>>>>>> to get it to work by using joins.
>>>>>>>>
>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>
>>>>>>>> Li
>>>>>>>>
>>>>>>>>

Mime
View raw message