Hi all!

I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for a project I'm working on. I've created a case class Person(name: String) and now I'm trying to make Spark to be able serialize and deserialize the defined type. I made a couple of attempts but none of them did not work in 100% (there were issues either in serialization or deserialization). 

This is my class and the corresponding UDT.

@SQLUserDefinedType(udt = classOf[PersonUDT])
case class Person(name: String)

class PersonUDT extends UserDefinedType[Person] {
  override def sqlType: DataType = StructType(Seq(StructField("name", StringType)))

  override def serialize(obj: Any): Seq[Any] = {
    obj match {
      case c: Person =>
        Seq(c.name)
    }
  }

  override def userClass: Class[Person] = classOf[Person]

  override def deserialize(datum: Any): Person = {
    datum match {
      case values: Seq[_] =>
        assert(values.length == 1)
        Person(values.head.asInstanceOf[String])
      case values: util.ArrayList[_] =>
        Person(values.get(0).asInstanceOf[String])
    }
  }
  
  // In some other attempt I was creating RDD of Seq with manually serialized data and 
  // I had to override equals because two DFs with the same type weren't actually equal
  // StructField(person,...types.PersonUDT@a096ac3)
  // StructField(person,...types.PersonUDT@613fd937)
  def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
 
  override def equals(other: Any): Boolean = other match {
    case that: PersonUDT => true
    case _ => false
  }

  override def hashCode(): Int = 1
}

This is how I create RDD of Person and then try to create a DataFrame
val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
val sparkDataFrame = sqlContext.createDataFrame(rdd)

The second line throws an exception:
java.lang.ClassCastException: ....types.PersonUDT cannot be cast to org.apache.spark.sql.types.StructType 
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)

I looked into the code in SQLContext.scala and it seems that the code requires UDT to be extending StructType but in fact it extends UserDefinedType which extends directly DataType.
I'm not sure whether it is a bug or I just don't know how to use UDTs.

Do you have any suggestions how to solve this? I based my UDT on ExamplePointUDT but it seems to be incorrect. Is there a working example for UDT?


Thank you for the reply in advance!
Wojtek