spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Hammerton <ja...@gluru.co>
Subject Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0
Date Mon, 07 Mar 2016 10:54:11 GMT
Hi,

So I managed to isolate the bug and I'm ready to try raising a JIRA issue.
I joined the Apache Jira project so I can create tickets.

However when I click Create from the Spark project home page on JIRA, it
asks me to click on one of the following service desks: Kylin, Atlas,
Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
raise an issue for Spark?!

Regards,

James


On 4 March 2016 at 14:03, James Hammerton <james@gluru.co> wrote:

> Sure thing, I'll see if I can isolate this.
>
> Regards.
>
> James
>
> On 4 March 2016 at 12:24, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> If you can reproduce the following with a unit test, I suggest you open a
>> JIRA.
>>
>> Thanks
>>
>> On Mar 4, 2016, at 4:01 AM, James Hammerton <james@gluru.co> wrote:
>>
>> Hi,
>>
>> I've come across some strange behaviour with Spark 1.6.0.
>>
>> In the code below, the filtering by "eventName" only seems to work if I
>> called .cache on the resulting DataFrame.
>>
>> If I don't do this, the code crashes inside the UDF because it processes
>> an event that the filter should get rid off.
>>
>> Any ideas why this might be the case?
>>
>> The code is as follows:
>>
>>>       val df = sqlContext.read.parquet(inputPath)
>>>       val filtered = df.filter(df("eventName").equalTo(Created))
>>>       val extracted = extractEmailReferences(sqlContext, filtered.cache)
>>> // Caching seems to be required for the filter to work
>>>       extracted.write.parquet(outputPath)
>>
>>
>> where extractEmailReferences does this:
>>
>>>
>>
>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>> DataFrame = {
>>
>>     val extracted = df.select(df(EventFieldNames.ObjectId),
>>
>>       extractReferencesUDF(df(EventFieldNames.EventJson),
>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>
>>
>>>     extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>
>>   }
>>
>>
>> and extractReferencesUDF:
>>
>>> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
>>> _: String))
>>
>> def extractReferences(eventJson: String, objectId: String, userId:
>>> String): String = {
>>>     import org.json4s.jackson.Serialization
>>>     import org.json4s.NoTypeHints
>>>     implicit val formats = Serialization.formats(NoTypeHints)
>>>
>>>     val created = Serialization.read[GMailMessage.Created](eventJson) //
>>> This is where the code crashes if the .cache isn't called
>>
>>
>>  Regards,
>>
>> James
>>
>>
>

Mime
View raw message