spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kazuaki Ishizaki" <ISHIZ...@jp.ibm.com>
Subject Re: Change nullable property in Dataset schema
Date Wed, 10 Aug 2016 07:04:54 GMT
After some investigations, I was able to change nullable property in 
Dataset[Array[Int]] in the following way. Is this right way?

(1) Apply https://github.com/apache/spark/pull/13873
(2) Use two Encoders. One is RowEncoder. The other is predefined 
ExressionEncoder.

class Test extends QueryTest with SharedSQLContext {
  import testImplicits._
  test("test") {
    val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), 
Array(3, 3)), 1).toDS
    val ds2 = ds1.map(e => e)
      .as(RowEncoder(new StructType()
         .add("value", ArrayType(IntegerType, false), nullable = false)))
      .as(newDoubleArrayEncoder)
    ds1.printSchema
    ds2.printSchema
  }
}

root
 |-- value: array (nullable = true)
 |    |-- element: integer (containsNull = false)

root
 |-- value: array (nullable = false)
 |    |-- element: integer (containsNull = false)


Kazuaki Ishizaki



From:   Kazuaki Ishizaki/Japan/IBM@IBMJP
To:     user@spark.apache.org
Date:   2016/08/03 23:46
Subject:        Change nullable property in Dataset schema



Dear all,
Would it be possible to let me know how to change nullable property in 
Dataset?

When I looked for how to change nullable property in Dataframe schema, I 
found the following approaches.
http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe

https://github.com/apache/spark/pull/13873(Not merged yet)

However, I cannot find how to change nullable property in Dataset schema. 
Even when I wrote the following program, nullable property for "value: 
array" in ds2.schema is not changed.
If my understanding is correct, current Spark 2.0 uses an 
ExpressionEncoder that is generated based on Dataset[T] at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46


class Test extends QueryTest with SharedSQLContext {
  import testImplicits._
  test("test") {
    val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), 
Array(3, 3)), 1).toDS
    val schema = new StructType().add("array", ArrayType(IntegerType, 
false), false)
    val inputObject = BoundReference(0, 
ScalaReflection.dataTypeFor[Array[Int]], false)
    val encoder = new ExpressionEncoder[Array[Int]](schema, true,
      ScalaReflection.serializerFor[Array[Int]](inputObject).flatten,
      ScalaReflection.deserializerFor[Array[Int]],
      ClassTag[Array[Int]](classOf[Array[Int]]))
    val ds2 = ds1.map(e => e)(encoder)
    ds1.printSchema
    ds2.printSchema
  }
}

root
 |-- value: array (nullable = true)
 |    |-- element: integer (containsNull = false)

root
 |-- value: array (nullable = true)                         // Expected 
(nullable = false)
 |    |-- element: integer (containsNull = false)


Kazuaki Ishizaki



Mime
View raw message