Oh sorry, I'm rereading your email more carefully.  Its only because you have some setup code that you want to amortize?

On Mon, Jan 5, 2015 at 10:40 PM, Michael Armbrust <michael@databricks.com> wrote:
The types expected by applySchema are documented in the type reference section: http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference

I'd certainly accept a PR to improve the docs and add a link to this from the applySchema section :)

Can you explain why you are using mapPartitions and UDFs don't work for you?  SQL doesn't really have a great support for partitions in general... We do support for Hive TGFs though and we could possibly add better scala syntax for this concept or something else.

On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer <tgp@preferred.jp> wrote:

I have a SchemaRDD where I want to add a column with a value that is
computed from the rest of the row. As the computation involves a
network operation and requires setup code, I can't use
  "SELECT *, myUDF(*) FROM rdd",
but I wanted to use a combination of:

 - get schema of input SchemaRDD
 - issue a mapPartitions call (including the setup code), obtaining a
   new RDD[Row]
 - extend the schema manually
 - create a new RDD by combining the RDD[Row] with the extended

This works very well, but I run into trouble querying that resulting
SchemaRDD with SQL if:

 - the result of my computation is a case class
 - and I want to use values in this case class in the SQL query.

In particular, while

  SELECT column FROM resultrdd

works well,

  SELECT column.key_name FROM resultrdd

gives a

  java.lang.ClassCastException: example.MyCaseClass cannot be cast to org.apache.spark.sql.catalyst.expressions.Row

Here is an example to illustrate that:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.types._

val sc = new SparkContext("local[3]", "Test")
val sqlc = new SQLContext(sc)
import sqlc._

// this is the case class that my operation is returning
case class Result(string_values: Map[String, String],
                  num_values: Map[String, Double])

// dummy result data
val data = (Result(Map("team" -> "a"), Map("score" -> 0.8)) ::
            Result(Map("team" -> "b"), Map("score" -> 0.5)) :: Nil)
val rdd = sc.parallelize(data)

// simulate my computation by creating an RDD[Row] and creating
// a schema programmatically
val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))
val progSchema = StructType(StructField("hello", IntegerType, false) ::
                            StructField("newcol", rdd.schema, true) :: Nil)
val progRdd = sqlc.applySchema(rowRdd, progSchema)

// the following call will *fail* with a ClassCastException
sqlc.sql("SELECT newcol.string_values['team'] FROM progrdd").foreach(println)

// however, the schema I specified is correct. see how embedding
// my result in a proper case class works:
case class ResultContainer(hello: Int, newcol: Result)
val caseClassRdd = rdd.map(dr => ResultContainer(7, dr))

// the following call will *work*
sqlc.sql("SELECT newcol.string_values['team'] FROM caseclassrdd").foreach(println)

// even though the schema for both RDDs is the same:
progRdd.schema == caseClassRdd.schema


It turns out that I cannot use the case class directly, but I have to
convert it to a Row as well. That is, instead of

  val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))

I have to use

  val rowRdd = rdd.map(dr => Row.fromSeq(7 :: Row.fromSeq(dr.productIterator.toSeq) :: Nil))

and then, I can use

  SELECT newcol.string_values['team'] FROM progrdd

So now I found that out and I'm happy that it works, but it was quite
hard to track it down, so I was wondering if this is the most
intuitive way to add a column to a SchemaRDD using mapPartitions (as
opposed to using a UDF, where the conversion "case class -> Row"
seems to happen automatically).

Or, even if there is no more intuitive way, just wanted to have this
documented ;-)