spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng Lian (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-15632) Dataset typed filter operation changes query plan schema
Date Tue, 31 May 2016 04:57:13 GMT

     [ https://issues.apache.org/jira/browse/SPARK-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Cheng Lian updated SPARK-15632:
-------------------------------
    Description: 
h1. Overview

Filter operations should never change query plan schema. However, Dataset typed filter operation
does introduce schema change in some cases. Furthermore, all the following aspects of the
schema may be changed:

# field order,
# field number,
# field data type,
# field name, and
# field nullability

This is mostly because we wrap the actual {{Filter}} operator with a {{SerializeFromObject}}/{{DeserializeToObject}}
pair (query plan fragment illustrated as following), which performs a bunch of magic tricks.

{noformat}
SerializeFromObject
 Filter
  DeserializeToObject
   <child-plan>
{noformat}

h1. Reproduction

h2. Field order, field number, and field data type change

{code}
case class A(b: Double, a: String)

val data = Seq(
  "{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
  "{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
  "{ 'a': 'bar', 'c': 'extra' }"
)

val df1 = spark.read.json(sc.parallelize(data))
df1.printSchema()
// root
//  |-- a: string (nullable = true)
//  |-- b: long (nullable = true)
//  |-- c: string (nullable = true)

val ds1 = df1.as[A]
ds1.printSchema()
// root
//  |-- a: string (nullable = true)
//  |-- b: long (nullable = true)
//  |-- c: string (nullable = true)

val ds2 = ds1.filter(_.b > 1)    // <- Here comes the trouble maker
ds2.printSchema()
// root                             <- 1. Reordered `a` and `b`, and
//  |-- b: double (nullable = true)    2. dropped `c`, and
//  |-- a: string (nullable = true)    3. up-casted `b` from long to double

val df2 = ds2.toDF()
df2.printSchema()
// root                             <- (Same as above)
//  |-- b: double (nullable = true)
//  |-- a: string (nullable = true)
{code}

h3. Field order change

{{DeserializeToObject}} resolves the encoder deserializer expression by *name*. Thus field
order in input query plan doesn't matter.

h3. Field number change

Same as above, fields not referred by the encoder are silently dropped while resolving deserializer
expressions by name.

h3. Field data type change

When generating deserializer expressions, we allows "sane" implicit coercions (e.g. integer
to long, and long to double) by inserting {{UpCast}} operators. Thus actual field data types
in input query plan don't matter either as long as there are valid implicit coercions.

h2. Field name and nullability change

{code}
val ds3 = spark.range(10)
ds3.printSchema()
// root
//  |-- id: long (nullable = false)

val ds4 = ds3.filter(_ > 3)
ds4.printSchema()
// root
//  |-- value: long (nullable = true)  4. Name changed from `id` to `value`, and
//                                     5. nullability changed from false to true
{code}

h3. Field name change

Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus they always has
only a single field with hard-coded name "value". On the other hand, when serializing domain
objects back to rows, schema of {{SerializeFromObject}} is solely determined by the encoder.
Thus the original name {{id}} becomes {{value}}.

h3. Nullability change

[PR #11880|https://github.com/apache/spark/pull/11880] updated return type of {{SparkSession.range}}
from {{Dataset\[Long\]}} to {{Dataset\[java.lang.Long\]}} due to [SI-4388|https://issues.scala-lang.org/browse/SI-4388].
As a consequence, although the underlying {{Range}} operator produces non-nullable output,
the result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we observe nullablity
change after typed filtering because serializer expression is derived from encoder rather
than the query plan.


  was:
Filter operations should never change query plan schema. However, Dataset typed filter operation
does introduce schema change:

{code}
case class A(b: Double, a: String)

val data = Seq(
  "{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
  "{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
  "{ 'a': 'bar', 'c': 'extra' }"
)

val df1 = spark.read.json(sc.parallelize(data))
df1.printSchema()
// root
//  |-- a: string (nullable = true)
//  |-- b: long (nullable = true)
//  |-- c: string (nullable = true)

val ds1 = df1.as[A]
ds1.printSchema()
// root
//  |-- a: string (nullable = true)
//  |-- b: long (nullable = true)
//  |-- c: string (nullable = true)

val ds2 = ds1.filter(_.b > 1)    // <- Here comes the trouble maker
ds2.printSchema()
// root                             <- 1. reordered `a` and `b`, and
//  |-- b: double (nullable = true)    2. dropped `c`, and
//  |-- a: string (nullable = true)    3. up-casted `b` from long to double

val df2 = ds2.toDF()
df2.printSchema()
// root                             <- (Same as above)
//  |-- b: double (nullable = true)
//  |-- a: string (nullable = true)
{code}

This is becase we wraps the actual {{Filter}} operator with a {{SerializeFromObject}}/{{DeserializeToObject}}
pair.

{{DeserializeToObject}} does a bunch of magic tricks here:

# Field order change
#- {{DeserializeToObject}} resolves the encoder deserializer expression by **name**. Thus
field order in input query plan doesn't matter.
# Field number change
#- Same as above, fields not referred by the encoder are silently dropped while resolving
deserializer expressions by name.
# Field data type change
#- When generating deserializer expressions, we allows "sane" implicit coercions (e.g. integer
to long, and long to double) by inserting {{UpCast}} operators. Thus actual field data types
in input query plan don't matter either as long as there are valid implicit coercions.

Actually, even field names may change once [PR #13269|https://github.com/apache/spark/pull/13269]
gets merged, because it introduces case-insensitive encoder resolution.


> Dataset typed filter operation changes query plan schema
> --------------------------------------------------------
>
>                 Key: SPARK-15632
>                 URL: https://issues.apache.org/jira/browse/SPARK-15632
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Cheng Lian
>
> h1. Overview
> Filter operations should never change query plan schema. However, Dataset typed filter
operation does introduce schema change in some cases. Furthermore, all the following aspects
of the schema may be changed:
> # field order,
> # field number,
> # field data type,
> # field name, and
> # field nullability
> This is mostly because we wrap the actual {{Filter}} operator with a {{SerializeFromObject}}/{{DeserializeToObject}}
pair (query plan fragment illustrated as following), which performs a bunch of magic tricks.
> {noformat}
> SerializeFromObject
>  Filter
>   DeserializeToObject
>    <child-plan>
> {noformat}
> h1. Reproduction
> h2. Field order, field number, and field data type change
> {code}
> case class A(b: Double, a: String)
> val data = Seq(
>   "{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
>   "{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
>   "{ 'a': 'bar', 'c': 'extra' }"
> )
> val df1 = spark.read.json(sc.parallelize(data))
> df1.printSchema()
> // root
> //  |-- a: string (nullable = true)
> //  |-- b: long (nullable = true)
> //  |-- c: string (nullable = true)
> val ds1 = df1.as[A]
> ds1.printSchema()
> // root
> //  |-- a: string (nullable = true)
> //  |-- b: long (nullable = true)
> //  |-- c: string (nullable = true)
> val ds2 = ds1.filter(_.b > 1)    // <- Here comes the trouble maker
> ds2.printSchema()
> // root                             <- 1. Reordered `a` and `b`, and
> //  |-- b: double (nullable = true)    2. dropped `c`, and
> //  |-- a: string (nullable = true)    3. up-casted `b` from long to double
> val df2 = ds2.toDF()
> df2.printSchema()
> // root                             <- (Same as above)
> //  |-- b: double (nullable = true)
> //  |-- a: string (nullable = true)
> {code}
> h3. Field order change
> {{DeserializeToObject}} resolves the encoder deserializer expression by *name*. Thus
field order in input query plan doesn't matter.
> h3. Field number change
> Same as above, fields not referred by the encoder are silently dropped while resolving
deserializer expressions by name.
> h3. Field data type change
> When generating deserializer expressions, we allows "sane" implicit coercions (e.g. integer
to long, and long to double) by inserting {{UpCast}} operators. Thus actual field data types
in input query plan don't matter either as long as there are valid implicit coercions.
> h2. Field name and nullability change
> {code}
> val ds3 = spark.range(10)
> ds3.printSchema()
> // root
> //  |-- id: long (nullable = false)
> val ds4 = ds3.filter(_ > 3)
> ds4.printSchema()
> // root
> //  |-- value: long (nullable = true)  4. Name changed from `id` to `value`, and
> //                                     5. nullability changed from false to true
> {code}
> h3. Field name change
> Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus they always
has only a single field with hard-coded name "value". On the other hand, when serializing
domain objects back to rows, schema of {{SerializeFromObject}} is solely determined by the
encoder. Thus the original name {{id}} becomes {{value}}.
> h3. Nullability change
> [PR #11880|https://github.com/apache/spark/pull/11880] updated return type of {{SparkSession.range}}
from {{Dataset\[Long\]}} to {{Dataset\[java.lang.Long\]}} due to [SI-4388|https://issues.scala-lang.org/browse/SI-4388].
As a consequence, although the underlying {{Range}} operator produces non-nullable output,
the result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we observe nullablity
change after typed filtering because serializer expression is derived from encoder rather
than the query plan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message