spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Simeon Simeonov (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-30127) UDF should work for case class like Dataset operations
Date Wed, 11 Dec 2019 17:43:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-30127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16993750#comment-16993750
] 

Simeon Simeonov edited comment on SPARK-30127 at 12/11/19 5:42 PM:
-------------------------------------------------------------------

The ability to transform one or more columns with native code, ignoring the rest of the schema,
is sorely missed. Some may think that Dataset operations such as {{map}}/{{flatMap}} could
be used to work around the need for this feature. That's true only in the cases where the
Scala type of the full schema is (a) known in advance and (b) unchanging, which is impractical
in many real-world use cases. Even in the cases where {{map}}/{{flatMap}} could work, there
will be a performance cost to converting the entire row to/from internal row format, as opposed
to just the columns that are needed.

However, UDFs are only one modality for exposing this capability and, given the Scala registration
requirement for the UDFs, not necessarily the best one. If we add this capability for UDFs,
I would suggest we also enhance the Dataset API with column-level {{map}}/{{flatMap}} functionality,
e.g.,
{code:scala}
def flatMapColumn[C: Encoder, U: Encoder](colName: String)(func: C => TraversableOnce[U]):
Dataset[U]
{code}
While multiple columns can be passed in using {{functions.struct(col1, col2, ...)}} and mapped
to {{C}} that is {{TupleN}}, if that costs additional processing (internal buffer copying,
serialization/deserialization), it would be trivial (and transparent to users if we rename
{{flatMapColumn}} to {{flatMapColumns}} and {{colName}} to {{colName1}} above) to add versions
for 2 and 3 columns, which would cover 99+% of all uses:
{code:scala}
def flatMapColumns[C1, C2, U](colName1: String, colName2: String)
  (func: (C1, C2) => TraversableOnce[U])
  (implicit evC: Encoder[(C1, C2)], evU: Encoder[U]): Dataset[U]

def flatMapColumns[C1, C2, C3, U](colName1: String, colName2: String, colName3: String)
  (func: (C1, C2, C3) => TraversableOnce[U])
  (implicit evC: Encoder[(C1, C2, C3)], evU: Encoder[U]): Dataset[U]
{code}
[~cloud_fan] There are at least three benefits to adding this capability.
 # It provides a fundamental missing capability to the Dataset API: transforming data while
knowing only part of the schema.
 # It makes use from Java more convenient, without the need for {{TypeTag}}, while making
it consistent with {{map}}/{{flatMap}} behavior (via {{MapFunction}}/{{FlatmapFunction}}).
Given Java's popularity, this is a big plus.
 # Unless I am mistaken, it may allow for more optimization than using UDFs.


was (Author: simeons):
The ability to transform one or more columns with native code, ignoring the rest of the schema,
is sorely missed. Some may think that Dataset operations such as {{map}}/{{flatMap}} could
be used to work around the need for this feature. That's true only in the cases where the
Scala type of the full schema is (a) known in advance and (b) unchanging, which is impractical
in many real-world use cases. Even in the cases where {{map}}/{{flatMap}} could work, there
will be a performance cost to converting the entire row to/from internal row format, as opposed
to just the columns that are needed.

However, UDFs are only one modality for exposing this capability and, given the Scala registration
requirement for the UDFs, not necessarily the best one. If we add this capability for UDFs,
I would suggest we also enhance the Dataset API with column-level {{map}}/{{flatMap}} functionality,
e.g.,
{code:scala}
def flatMapColumns[C: Encoder, U: Encoder](colName: String)(func: C => TraversableOnce[U]):
Dataset[U]
{code}
While multiple columns can be passed in using {{functions.struct(col1, col2, ...)}} and mapped
to {{C}} that is {{TupleN}}, if that costs additional processing (internal buffer copying,
serialization/deserialization), it would be trivial (and transparent to users if we rename
{{colName}} above to {{colName1}}) to add versions for 2 and 3 columns, which would cover
99+% of all uses:
{code:scala}
def flatMapColumns[C1, C2, U](colName1: String, colName2: String)
  (func: (C1, C2) => TraversableOnce[U])
  (implicit evC: Encoder[(C1, C2)], evU: Encoder[U]): Dataset[U]

def flatMapColumns[C1, C2, C3, U](colName1: String, colName2: String, colName3: String)
  (func: (C1, C2, C3) => TraversableOnce[U])
  (implicit evC: Encoder[(C1, C2, C3)], evU: Encoder[U]): Dataset[U]
{code}
[~cloud_fan] There are at least three benefits to adding this capability.
 # It provides a fundamental missing capability to the Dataset API: transforming data while
knowing only part of the schema.
 # It makes use from Java more convenient, without the need for {{TypeTag}}, while making
it consistent with {{map}}/{{flatMap}} behavior (via {{MapFunction}}/{{FlatmapFunction}}).
Given Java's popularity, this is a big plus.
 # Unless I am mistaken, it may allow for more optimization than using UDFs.

> UDF should work for case class like Dataset operations
> ------------------------------------------------------
>
>                 Key: SPARK-30127
>                 URL: https://issues.apache.org/jira/browse/SPARK-30127
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Wenchen Fan
>            Priority: Major
>
> Currently, Spark UDF can only work on data types like java.lang.String, o.a.s.sql.Row,
Seq[_], etc. This is inconvenient if you want to apply an operation on one column, and the
column is struct type. You must access data from a Row object, instead of your domain object
like Dataset operations. It will be great if UDF can work on types that are supported by Dataset,
e.g. case classes.
> Note that, there are multiple ways to register a UDF, and it's only possible to support
this feature if the UDF is registered using Scala API that provides type tag, e.g. `def udf[RT:
TypeTag, A1: TypeTag](f: Function1[A1, RT])`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message