spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kazuaki Ishizaki" <ISHIZ...@jp.ibm.com>
Subject Re: Change nullable property in Dataset schema
Date Wed, 17 Aug 2016 06:18:03 GMT
My motivation is to simplify Java code generated by a compiler of 
Tungsten.

Here is a dump of generated code from the program.
https://gist.github.com/kiszk/402bd8bc45a14be29acb3674ebc4df24

If we can succeeded to let catalyst the result of map is never null, we 
can eliminate conditional branches.
For example, in the above URL, we can say the condition at line 45 is 
always false since the result of map() is never null by using our schema. 
As a result, we can eliminate assignments at lines 52 and 56, and 
conditional branches at lines 55 and 61.

Kazuaki Ishizaki



From:   Koert Kuipers <koert@tresata.com>
To:     Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc:     "user@spark.apache.org" <user@spark.apache.org>
Date:   2016/08/16 04:35
Subject:        Re: Change nullable property in Dataset schema



why do you want the array to have nullable = false? what is the benefit?

On Wed, Aug 3, 2016 at 10:45 AM, Kazuaki Ishizaki <ISHIZAKI@jp.ibm.com> 
wrote:
Dear all,
Would it be possible to let me know how to change nullable property in 
Dataset?

When I looked for how to change nullable property in Dataframe schema, I 
found the following approaches.
http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe

https://github.com/apache/spark/pull/13873(Not merged yet)

However, I cannot find how to change nullable property in Dataset schema. 
Even when I wrote the following program, nullable property for "value: 
array" in ds2.schema is not changed.
If my understanding is correct, current Spark 2.0 uses an 
ExpressionEncoder that is generated based on Dataset[T] at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46


class Test extends QueryTest with SharedSQLContext {
  import testImplicits._
  test("test") {
    val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), 
Array(3, 3)), 1).toDS
    val schema = new StructType().add("array", ArrayType(IntegerType, 
false), false)
    val inputObject = BoundReference(0, 
ScalaReflection.dataTypeFor[Array[Int]], false)
    val encoder = new ExpressionEncoder[Array[Int]](schema, true,
      ScalaReflection.serializerFor[Array[Int]](inputObject).flatten,
      ScalaReflection.deserializerFor[Array[Int]],
      ClassTag[Array[Int]](classOf[Array[Int]]))
    val ds2 = ds1.map(e => e)(encoder)
    ds1.printSchema
    ds2.printSchema
  }
}

root
 |-- value: array (nullable = true)
 |    |-- element: integer (containsNull = false)

root
 |-- value: array (nullable = true)                         // Expected 
(nullable = false)
 |    |-- element: integer (containsNull = false)


Kazuaki Ishizaki




Mime
View raw message