spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wojtek Jurczyk <wojtek.jurc...@gmail.com>
Subject User Defined Type (UDT)
Date Thu, 07 May 2015 09:10:51 GMT
Hi all!

I'm using Spark 1.3.0 and I'm struggling with a definition of a new type
for a project I'm working on. I've created a case class Person(name:
String) and now I'm trying to make Spark to be able serialize and
deserialize the defined type. I made a couple of attempts but none of them
did not work in 100% (there were issues either in serialization or
deserialization).

This is my class and the corresponding UDT.

@SQLUserDefinedType(udt = classOf[PersonUDT])
case class Person(name: String)

class PersonUDT extends UserDefinedType[Person] {
  override def sqlType: DataType = StructType(Seq(StructField("name",
StringType)))

  override def serialize(obj: Any): Seq[Any] = {
    obj match {
      case c: Person =>
        Seq(c.name)
    }
  }

  override def userClass: Class[Person] = classOf[Person]

  override def deserialize(datum: Any): Person = {
    datum match {
      case values: Seq[_] =>
        assert(values.length == 1)
        Person(values.head.asInstanceOf[String])
      case values: util.ArrayList[_] =>
        Person(values.get(0).asInstanceOf[String])
    }
  }

  // In some other attempt I was creating RDD of Seq with manually
serialized data and
  // I had to override equals because two DFs with the same type weren't
actually equal
  // StructField(person,...types.PersonUDT@a096ac3)
  // StructField(person,...types.PersonUDT@613fd937)
  def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]

  override def equals(other: Any): Boolean = other match {
    case that: PersonUDT => true
    case _ => false
  }

  override def hashCode(): Int = 1
}

This is how I create RDD of Person and then try to create a DataFrame
val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
val sparkDataFrame = sqlContext.createDataFrame(rdd)

The second line throws an exception:
java.lang.ClassCastException: ....types.PersonUDT cannot be cast to
org.apache.spark.sql.types.StructType
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)

I looked into the code in SQLContext.scala and it seems that the code
requires UDT to be extending StructType but in fact it extends
UserDefinedType which extends directly DataType.
I'm not sure whether it is a bug or I just don't know how to use UDTs.

Do you have any suggestions how to solve this? I based my UDT on
ExamplePointUDT but it seems to be incorrect. Is there a working example
for UDT?


Thank you for the reply in advance!
Wojtek

Mime
View raw message