spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Hammerton <ja...@gluru.co>
Subject DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Date Fri, 04 Mar 2016 12:01:50 GMT
Hi,

I've come across some strange behaviour with Spark 1.6.0.

In the code below, the filtering by "eventName" only seems to work if I
called .cache on the resulting DataFrame.

If I don't do this, the code crashes inside the UDF because it processes an
event that the filter should get rid off.

Any ideas why this might be the case?

The code is as follows:

>       val df = sqlContext.read.parquet(inputPath)
>       val filtered = df.filter(df("eventName").equalTo(Created))
>       val extracted = extractEmailReferences(sqlContext, filtered.cache)
> // Caching seems to be required for the filter to work
>       extracted.write.parquet(outputPath)


where extractEmailReferences does this:

>

def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
> DataFrame = {

    val extracted = df.select(df(EventFieldNames.ObjectId),

      extractReferencesUDF(df(EventFieldNames.EventJson),
> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")


>     extracted.filter(extracted("references").notEqual("UNKNOWN"))

  }


and extractReferencesUDF:

> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _:
> String))

def extractReferences(eventJson: String, objectId: String, userId: String):
> String = {
>     import org.json4s.jackson.Serialization
>     import org.json4s.NoTypeHints
>     implicit val formats = Serialization.formats(NoTypeHints)
>
>     val created = Serialization.read[GMailMessage.Created](eventJson) //
> This is where the code crashes if the .cache isn't called


 Regards,

James

Mime
View raw message