spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: Parameterized types and Datasets - Spark 2.1.0
Date Wed, 01 Feb 2017 21:16:47 GMT
You need to enforce that an Encoder is available for the type A using a context
bound <http://docs.scala-lang.org/tutorials/FAQ/context-bounds>.

import org.apache.spark.sql.Encoder
abstract class RawTable[A : Encoder](inDir: String) {
  ...
}

On Tue, Jan 31, 2017 at 8:12 PM, Don Drake <dondrake@gmail.com> wrote:

> I have a set of CSV that I need to perform ETL on, with the plan to re-use
> a lot of code between each file in a parent abstract class.
>
> I tried creating the following simple abstract class that will have a
> parameterized type of a case class that represents the schema being read in.
>
> This won't compile, it just complains about not being able to find an
> encoder, but I'm importing the implicits and don't believe this error.
>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> abstract class RawTable[A](inDir: String) {
>      |     def load() = {
>      |         spark.read
>      |             .option("header", "true")
>      |             .option("mode", "FAILFAST")
>      |             .option("escape", "\"")
>      |             .option("nullValue", "")
>      |             .option("indferSchema", "true")
>      |             .csv(inDir)
>      |             .as[A]
>      |     }
>      | }
> <console>:27: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>                    .as[A]
>
> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
> <console>:13: error: not found: type RawTable
>        class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>                       ^
>
> What's odd is that this output looks okay:
>
> scala> val RTEncoder = Encoders.product[RawTemp]
> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
> string]
>
> scala> RTEncoder.schema
> res4: org.apache.spark.sql.types.StructType = StructType(StructField(f1,StringType,true),
> StructField(f2,StringType,true), StructField(temp,LongType,false),
> StructField(created_at,TimestampType,true), StructField(data_filename,
> StringType,true))
>
> scala> RTEncoder.clsTag
> res5: scala.reflect.ClassTag[RawTemp] = RawTemp
>
> Any ideas?
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake <http://www.MailLaunder.com/>
> 800-733-2143 <(800)%20733-2143>
>

Mime
View raw message