spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-10536) filtered POJOs replaced by other instances after collect()
Date Thu, 10 Sep 2015 16:25:45 GMT

    [ https://issues.apache.org/jira/browse/SPARK-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739056#comment-14739056
] 

Sean Owen commented on SPARK-10536:
-----------------------------------

Tough one -- I think the behavior is not a surprise to people that have worked with Hadoop
MapReduce and its InputFormat, but otherwise probably is. It doesn't come up for many common
value object types like a String or Integer. IIRC there were a number of discussions about
whether to specially clone the object and 'override' Hadoop behavior by default, and incur
the performance hit everywhere, but there was no reliable way of finding a copy constructor
for value objects. 

The best thing is to copy off the object if you're going to hold any long-lived reference
to it. Here I would have thought you would have to copy the datum field inside the Visitor
object?

> filtered POJOs replaced by other instances after collect()
> ----------------------------------------------------------
>
>                 Key: SPARK-10536
>                 URL: https://issues.apache.org/jira/browse/SPARK-10536
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Erik Schmiegelow
>
> I've encountered a very strange phenomenon with collect() in a simplistic program written
for debugging purposes.
> The objective of the program is to filter objects which match an id and print their contents
to stderr so that we can have a look at the contents. Our initial plan was to have the driver
do that, because we run our applications in a YARN cluster and we didn't want to have to look
for the executor instance first before looking at the log files.
> We then discovered that the results after collect didn't match the ids for which we had
filtered, so we added a few debugging statements to find out what happened. Interestingly
enough, we get the correct instances when we look at the instances with filter() or map()
- on the executor. Once the instances are sent back to the driver, the instances are swapped.
More intriguingly, we always get the same set of incorrect instances.
> Here' s the code:
> {code}
>     val rdd = sparkContext.newAPIHadoopFile(
>       input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]], classOf[NullWritable]).map(
>       f => f._1.datum()
>       ).filter(visitor => {
>       val result = visitor.eid == eid
>       if (result) {
>         println(s"Found match ${visitor.eid} for $eid with hash ${visitor.hashCode()}")
>         val mapper = new ObjectMapper()
>         System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
>       }
>       result
>     }).map(f => {
>       val mapper = new ObjectMapper()
>       println(s"Map Output of visitor ${f.eid} for $eid with hash ${f.hashCode()}")
>       System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
>       f
>     }).collect().foreach(f => {
>       val mapper = new ObjectMapper()
>       println(s"Collect Output of visitor ${f.eid} for $eid with hash ${f.hashCode()}")
>       System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
>       f})
> {code}
> The output we get in the Executor (filter + map) is as follows:
> {code}
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> {code}
> The output on the driver is this:
> {code}
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> {code}
> Some notes on the input: 
> - we use reflective Avro to serialize to HDFS
> - we've got about 5 GB of data



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message