spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: Add StructType column to SchemaRDD
Date Tue, 06 Jan 2015 06:40:28 GMT
The types expected by applySchema are documented in the type reference
section:
http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference

I'd certainly accept a PR to improve the docs and add a link to this from
the applySchema section :)

Can you explain why you are using mapPartitions and UDFs don't work for
you?  SQL doesn't really have a great support for partitions in general...
We do support for Hive TGFs though and we could possibly add better scala
syntax for this concept or something else.

On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer <tgp@preferred.jp> wrote:

> Hi,
>
> I have a SchemaRDD where I want to add a column with a value that is
> computed from the rest of the row. As the computation involves a
> network operation and requires setup code, I can't use
>   "SELECT *, myUDF(*) FROM rdd",
> but I wanted to use a combination of:
>
>  - get schema of input SchemaRDD
>  - issue a mapPartitions call (including the setup code), obtaining a
>    new RDD[Row]
>  - extend the schema manually
>  - create a new RDD by combining the RDD[Row] with the extended
>    schema.
>
> This works very well, but I run into trouble querying that resulting
> SchemaRDD with SQL if:
>
>  - the result of my computation is a case class
>  - and I want to use values in this case class in the SQL query.
>
> In particular, while
>
>   SELECT column FROM resultrdd
>
> works well,
>
>   SELECT column.key_name FROM resultrdd
>
> gives a
>
>   java.lang.ClassCastException: example.MyCaseClass cannot be cast to
> org.apache.spark.sql.catalyst.expressions.Row
>
> Here is an example to illustrate that:
>
> -----------------------------------
>
> import org.apache.spark._import org.apache.spark.sql._import org.apache.spark.sql.catalyst.types._
>
> val sc = new SparkContext("local[3]", "Test")
> val sqlc = new SQLContext(sc)import sqlc._
> // this is the case class that my operation is returningcase class Result(string_values:
Map[String, String],                  num_values: Map[String, Double])// dummy result dataval
data = (Result(Map("team" -> "a"), Map("score" -> 0.8)) ::            Result(Map("team"
-> "b"), Map("score" -> 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my computation
by creating an RDD[Row] and creating// a schema programmaticallyval rowRdd = rdd.map(dr =>
Row.fromSeq(7 :: dr :: Nil))val progSchema = StructType(StructField("hello", IntegerType,
false) ::                            StructField("newcol", rdd.schema, true) :: Nil)val progRdd
= sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable("progrdd")// the following
call will *fail* with a ClassCastExceptionsqlc.sql("SELECT newcol.string_values['team'] FROM
progrdd").foreach(println)// however, the schema I specified is correct. see how embedding//
my result in a proper case class works:case class ResultContainer(hello: Int, newcol: Result)val
caseClassRdd = rdd.map(dr => ResultContainer(7, dr))caseClassRdd.registerTempTable("caseclassrdd")//
the following call will *work*sqlc.sql("SELECT newcol.string_values['team'] FROM caseclassrdd").foreach(println)//
even though the schema for both RDDs is the same:progRdd.schema == caseClassRdd.schema
>
>
> -----------------------------------
>
> It turns out that I cannot use the case class directly, but I have to
> convert it to a Row as well. That is, instead of
>
>   val rowRdd = rdd.map(dr => Row.fromSeq(7 :: dr :: Nil))
>
> I have to use
>
>   val rowRdd = rdd.map(dr => Row.fromSeq(7 ::
> Row.fromSeq(dr.productIterator.toSeq) :: Nil))
>
> and then, I can use
>
>   SELECT newcol.string_values['team'] FROM progrdd
>
> So now I found that out and I'm happy that it works, but it was quite
> hard to track it down, so I was wondering if this is the most
> intuitive way to add a column to a SchemaRDD using mapPartitions (as
> opposed to using a UDF, where the conversion "case class -> Row"
> seems to happen automatically).
>
> Or, even if there is no more intuitive way, just wanted to have this
> documented ;-)
>
> Thanks
> Tobias
>
>

Mime
View raw message