spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hyukjin Kwon (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-26858) Vectorized gapplyCollect, Arrow optimization in native R function execution
Date Wed, 13 Feb 2019 12:14:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-26858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766941#comment-16766941
] 

Hyukjin Kwon edited comment on SPARK-26858 at 2/13/19 12:13 PM:
----------------------------------------------------------------

cc [~felixcheung], [~shivaram], [~rxin@databricks.com], [~bryanc], [~viirya]

Actually, I wonder if we should add {{gapplyCollect}} support.
 Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart to explain why it's
tricky and hacky to implement.
 I still see row by row operation here and there in {{gapplyCollect}} but I don't target to
explain performance aspect here.
{code:java}
|Driver                                                             |Executor
|R side                      |JVM side                              |JVM side            
                   |R side                                 |

|1. call `gapplyCollect`     |                                      |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|2.  DataFrame<R: binary> is |                                      |              
                         |                                       |
|  set for its output schema |                                      |                    
                   |                                       |
|  and call `collect`        |                                      |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|3.                          | Query plan is done and it executes   |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|4.                          |                                      | Serialize from JVM record
to           |                                       |
|                            |                                      |   R recode line by line
               |                                       |
|                            |                                      |                    
                   |                                       |
|5.                          |                                      | Send bytes         
                   | Recieve bytes                         |
|                            |                                      |                    
                   |                                       |
|6.                          |                                      |                    
                   | Deserializes bytes to an R Data frame |
|                            |                                      |                    
                   |                                       |
|7.                          |                                      |                    
                   | Execute R function (with key)         |
|                            |                                      |                    
                   |                                       |
|8.                          |                                      |                    
                   | Serializes output R DataFrame         |
|                            |                                      |                    
                   |                                       |
|9.                          |                                      | Recieve bytes      
                   | Send bytes back                       |
|                            |                                      |                    
                   |                                       |
|10.                         |                                      | Wrap it with JVM row
Row(Array[Byte]); |                                       |
|                            |                                      | each record is an R
DataFrame.         |                                       |
|11.                         | Row(Array[Byte]) are collected       |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|12. Deserializes each as an |                                      |                    
                   |                                       |
|  R Data Frame              |                                      |                    
                   |                                       |                       
{code}
The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because
{{schema}} must be set like Pandas Groupped Map UDF.
 However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema
before execution.

So, from 8. to 12. above, this way is fine if we use regular {{gapply}} because each R Data
Frame can be ser/de and then combined later.
{code:java}
do.call("rbind", list(df, df, df ...)) 
{code}
However, in case of Arrow, it needs to know schema ahead because Arrow stream format is
basically
{code:java}
| Schema      |
|-------------|
| Arrow Batch |
|-------------|
| Arrow Batch |
|-------------|
   ...
{code}
(Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.)

This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this
in {{gapplyCollect}} since it's wrapped by {{BiaryType}}.
 So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization.
Also, looks Python Arrow API has an API that read an Arrow table from Arrow batches directly
but looks R API does not have it. If R API has it, the workaround might be easier.

There are few ways to work around this problem.

  1. Read schema from the first {{Arrow Batch}} and use it.
    I was trying this way but realised it's pretty hacky and different from what we do in
Python side.

  2. Map {{df}} to whole Arrow streaming format above.
    This is different protocol with Python vectorization.

  3. Send Arrow batch by Arrow batch without {{Schema}}. 
    This is different protocol with Python vectorization.

  4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use
{{collect}} and {{gapply}} combination instead (this is the current way).

I tried 1. way just now and looks possible but pretty hacky, and at least different from Python
side vectorization.
 For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing
to 1.
3. Might needs smaller change.

So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried
about the maintenance overhead if we go ahead with a different approach with Python side.
I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again
when some users actually ask it.

Currently, we just throw an exception if users call {{gapplyCollect}} that users should user
{{gapply}} and {{collect}} separately.

I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway.


was (Author: hyukjin.kwon):
cc [~felixcheung], [~shivaram], [~rxin@databricks.com], [~bryanc], [~viirya]

Actually, I wonder if we should add {{gapplyCollect}} support.
 Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart above to explain
what it's tricky and hacky to implement.
 I still see row by row operation here and there in {{gapplyCollect}} but I don't target to
explain performance aspect here.
{code:java}
|Driver                                                             |Executor
|R side                      |JVM side                              |JVM side            
                   |R side                                 |

|1. call `gapplyCollect`     |                                      |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|2.  DataFrame<R: binary> is |                                      |              
                         |                                       |
|  set for its output schema |                                      |                    
                   |                                       |
|  and call `collect`        |                                      |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|3.                          | Query plan is done and it executes   |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|4.                          |                                      | Serialize from JVM record
to           |                                       |
|                            |                                      |   R recode line by line
               |                                       |
|                            |                                      |                    
                   |                                       |
|5.                          |                                      | Send bytes         
                   | Recieve bytes                         |
|                            |                                      |                    
                   |                                       |
|6.                          |                                      |                    
                   | Deserializes bytes to an R Data frame |
|                            |                                      |                    
                   |                                       |
|7.                          |                                      |                    
                   | Execute R function (with key)         |
|                            |                                      |                    
                   |                                       |
|8.                          |                                      |                    
                   | Serializes output R DataFrame         |
|                            |                                      |                    
                   |                                       |
|9.                          |                                      | Recieve bytes      
                   | Send bytes back                       |
|                            |                                      |                    
                   |                                       |
|10.                         |                                      | Wrap it with JVM row
Row(Array[Byte]); |                                       |
|                            |                                      | each record is an R
DataFrame.         |                                       |
|11.                         | Row(Array[Byte]) are collected       |                    
                   |                                       |
|                            |                                      |                    
                   |                                       |
|12. Deserializes each as an |                                      |                    
                   |                                       |
|  R Data Frame              |                                      |                    
                   |                                       |                       
{code}
The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because
{{schema}} must be set like Pandas Groupped Map UDF.
 However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema
before execution.

So, from 8. to 12. above, this way is fine if we use regular {{gapply}} because each R Data
Frame can be ser/de and then combined later.
{code:java}
do.call("rbind", list(df, df, df ...)) 
{code}
However, in case of Arrow, it needs to know schema ahead because Arrow stream format is
basically
{code:java}
| Schema      |
|-------------|
| Arrow Batch |
|-------------|
| Arrow Batch |
|-------------|
   ...
{code}
(Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.)

This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this
in {{gapplyCollect}} since it's wrapped by {{BiaryType}}.
 So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization.
Also, looks Python Arrow API has an API that read an Arrow table from Arrow batches directly
but looks R API does not have it. If R API has it, the workaround might be easier.

There are few ways to work around this problem.

  1. Read schema from the first {{Arrow Batch}} and use it.
    I was trying this way but realised it's pretty hacky and different from what we do in
Python side.

  2. Map {{df}} to whole Arrow streaming format above.
    This is different protocol with Python vectorization.

  3. Send Arrow batch by Arrow batch without {{Schema}}. 
    This is different protocol with Python vectorization.

  4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use
{{collect}} and {{gapply}} combination instead (this is the current way).

I tried 1. way just now and looks possible but pretty hacky, and at least different from Python
side vectorization.
 For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing
to 1.
3. Might needs smaller change.

So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried
about the maintenance overhead if we go ahead with a different approach with Python side.
I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again
when some users actually ask it.

Currently, we just throw an exception if users call {{gapplyCollect}} that users should user
{{gapply}} and {{collect}} separately.

I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway.

> Vectorized gapplyCollect, Arrow optimization in native R function execution
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-26858
>                 URL: https://issues.apache.org/jira/browse/SPARK-26858
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SparkR, SQL
>    Affects Versions: 3.0.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Major
>
> Unlike gapply, gapplyCollect requires additional ser/de steps because it can omit the
schema, and Spark SQL doesn't know the return type before actually execution happens.
> In original code path, it's done via using binary schema. Once gapply is done (SPARK-26761).
we can mimic this approach in vectorized gapply to support gapplyCollect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message