spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Don Drake <dondr...@gmail.com>
Subject Re: Parameterized types and Datasets - Spark 2.1.0
Date Wed, 01 Feb 2017 23:34:55 GMT
Thanks for the reply.   I did give that syntax a try [A : Encoder]
yesterday, but I kept getting this exception in a spark-shell and Zeppelin
browser.

scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder

scala>

scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
java.sql.Timestamp, data_filename: String)
defined class RawTemp

scala>

scala> import spark.implicits._
import spark.implicits._

scala>

scala> abstract class RawTable[A : Encoder](inDir: String) {
     |     import spark.implicits._
     |     def load() = {
     |         import spark.implicits._
     |         spark.read
     |             .option("header", "true")
     |             .option("mode", "FAILFAST")
     |             .option("escape", "\"")
     |             .option("nullValue", "")
     |             .option("indferSchema", "true")
     |             .csv(inDir)
     |             .as[A]
     |     }
     | }
<console>:13: error: not found: type Encoder
       abstract class RawTable[A : Encoder](inDir: String) {
                                   ^
<console>:24: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
                   .as[A]


I gave it a try today in a Scala application and it seems to work.  Is this
a known issue in a spark-shell?

In my Scala application, this is being defined in a separate file, etc.
without direct access to a Spark session.

I had to add the following code snippet so the import spark.implicits._
would take effect:

// ugly hack to get around Encoder can't be found compile time errors

private object myImplicits extends SQLImplicits {

  protected override def _sqlContext: SQLContext =
MySparkSingleton.getCurrentSession().sqlContext

}

import myImplicits._

I found that in about the hundredth SO post I searched for this problem.
Is this the best way to let implicits do its thing?

Thanks.

-Don



On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust <michael@databricks.com>
wrote:

> You need to enforce that an Encoder is available for the type A using a context
> bound <http://docs.scala-lang.org/tutorials/FAQ/context-bounds>.
>
> import org.apache.spark.sql.Encoder
> abstract class RawTable[A : Encoder](inDir: String) {
>   ...
> }
>
> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake <dondrake@gmail.com> wrote:
>
>> I have a set of CSV that I need to perform ETL on, with the plan to
>> re-use a lot of code between each file in a parent abstract class.
>>
>> I tried creating the following simple abstract class that will have a
>> parameterized type of a case class that represents the schema being read in.
>>
>> This won't compile, it just complains about not being able to find an
>> encoder, but I'm importing the implicits and don't believe this error.
>>
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
>> java.sql.Timestamp, data_filename: String)
>> defined class RawTemp
>>
>> scala>
>>
>> scala> abstract class RawTable[A](inDir: String) {
>>      |     def load() = {
>>      |         spark.read
>>      |             .option("header", "true")
>>      |             .option("mode", "FAILFAST")
>>      |             .option("escape", "\"")
>>      |             .option("nullValue", "")
>>      |             .option("indferSchema", "true")
>>      |             .csv(inDir)
>>      |             .as[A]
>>      |     }
>>      | }
>> <console>:27: error: Unable to find encoder for type stored in a
>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>> classes) are supported by importing spark.implicits._  Support for
>> serializing other types will be added in future releases.
>>                    .as[A]
>>
>> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>> <console>:13: error: not found: type RawTable
>>        class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>>                       ^
>>
>> What's odd is that this output looks okay:
>>
>> scala> val RTEncoder = Encoders.product[RawTemp]
>> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
>> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
>> string]
>>
>> scala> RTEncoder.schema
>> res4: org.apache.spark.sql.types.StructType =
>> StructType(StructField(f1,StringType,true),
>> StructField(f2,StringType,true), StructField(temp,LongType,false),
>> StructField(created_at,TimestampType,true),
>> StructField(data_filename,StringType,true))
>>
>> scala> RTEncoder.clsTag
>> res5: scala.reflect.ClassTag[RawTemp] = RawTemp
>>
>> Any ideas?
>>
>> --
>> Donald Drake
>> Drake Consulting
>> http://www.drakeconsulting.com/
>> https://twitter.com/dondrake <http://www.MailLaunder.com/>
>> 800-733-2143 <(800)%20733-2143>
>>
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143

Mime
View raw message