spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Frank Ottey (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-27013) Add support for external encoders when resolving org.apache.spark.sql.catalyst.encoders.ExpressionEncoder's apply method
Date Thu, 28 Feb 2019 19:25:00 GMT
Frank Ottey created SPARK-27013:
-----------------------------------

             Summary: Add support for external encoders when resolving org.apache.spark.sql.catalyst.encoders.ExpressionEncoder's
apply method
                 Key: SPARK-27013
                 URL: https://issues.apache.org/jira/browse/SPARK-27013
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 2.4.0
            Reporter: Frank Ottey


I recently discovered that, because most of the common implicit encoders introduced by 

{noformat}import spark.implicits._{noformat}

reduce to a call to {{ExpressionEncoder}}'s {{apply}} method, it's _very_ difficult to generate
and/or operate on {{Column}}'s whose internal types reduce to some Scala type that wraps an
external type, even if an implicit encoder for that external type is available or could be
trivially generated. See the example below:

{code:scala}
import com.example.MyBean

object Example {
    implicit def BeanEncoder: Encoder[MyBean] = Encoders.bean(classOf[MyBean])
    
    def main(args: Array[String]): Unit = {

        val path = args(0)

        val spark: SparkSession = ???

        import spark.implicits._

        // THE FOLLOWING DOES NOT WORK!!!
        // implicit encoder for Seq[_] is found and used...
        // Calls ExpressionEncoder's apply method
        // Unwraps the inner type com.example.MyBean...
        // ScalaReflection.serialzeFor() cannot find encoder for our type
        // Even though we can trivially create one above!!!!

        // Fails at runtime with UnsupportedOperationException from 
        // ScalaReflection.serialzeFor()
        val ds = spark.read
                      .format("avro")
                      .option("compression", "snappy")
                      .load(path)
                      .select($"myColumn".as[Seq[MyBean]])

}
{code}

What's particularly frustrating is that if we were using any user-defined case class instead
of the java bean type, this is not a problem, as the structuring of the various implicit encoders
in the related packages seems to allow the {{ScalaReflection.serializeFor()}} method to work
on arbitrary {{scala.Product}} types... (There's an implicit encoder in org.apache.spark.sql.Encoders
that looks relevant)

I realize that there are workarounds, such as wrapping the types and then using a simple {{.map()}},
or using kryo or java serialization, but my understanding is that would mean giving up on
potential Catalyst optimizations...

It would be really nice if there were a simple way to tell {{ScalaReflection.serializeFor()}}
to look for/use other, potentially user-defined encoders, especially if they could be generated
from the factory encoder methods supplied by Spark itself...

Alternatively, It would be exceptionally nice if calls to {{ExpressionEncoder}}'s {{apply}}
method would support expressions with types that include {{java.util.List}} or arbitrary java
bean types as well as {{scala.Product}} types.

See [here|https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset]
on Stackoverflow for other details...




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message