spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Miller <cmiller11...@gmail.com>
Subject Re: Repeating Records w/ Spark + Avro?
Date Sat, 12 Mar 2016 08:15:13 GMT
Wow! That sure is buried in the documentation! But yeah, that's what I
thought more or less.

I tried copying as follows, but that didn't work.

*****************
val copyRDD = singleFileRDD.map(_.copy())
*****************

When I iterate over the new copyRDD (foreach or map), I still have the same
problem of duplicate records. I also tried copying within the block where
I'm using it, but that didn't work either:

*****************
rdd
  .take(10)
  .collect()
  .map(item => {
    val item = i.copy()
    val record = i._1.datum()

    println(record.get("myValue"))
  })
*****************

What am I doing wrong?

--
Chris Miller

On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohajeri@gmail.com>
wrote:

> Here is the reason for the behavior:
> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
> object for each record, directly caching the returned RDD or directly
> passing it to an aggregation or shuffle operation will create many
> references to the same object. If you plan to directly cache, sort, or
> aggregate Hadoop writable objects, you should first copy them using a map
>  function.
>
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>
> So it is Hadoop related.
>
> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11101@gmail.com>
> wrote:
>
>> I have a bit of a strange situation:
>>
>> *****************
>> import org.apache.avro.generic.{GenericData, GenericRecord}
>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>
>> val path = "/path/to/data.avro"
>>
>> val rdd = sc.newAPIHadoopFile(path,
>> classOf[AvroKeyInputFormat[GenericRecord]],
>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>> rdd.take(10).foreach( x => println( x._1.datum() ))
>> *****************
>>
>> In this situation, I get the right number of records returned, and if I
>> look at the contents of rdd I see the individual records as tuple2's...
>> however, if I println on each one as shown above, I get the same result
>> every time.
>>
>> Apparently this has to do with something in Spark or Avro keeping a
>> reference to the item its iterating over, so I need to clone the object
>> before I use it. However, if I try to clone it (from the spark-shell
>> console), I get:
>>
>> *****************
>> rdd.take(10).foreach( x => {
>>   val clonedDatum = x._1.datum().clone()
>>   println(clonedDatum.datum())
>> })
>>
>> <console>:37: error: method clone in class Object cannot be accessed in
>> org.apache.avro.generic.GenericRecord
>>  Access to protected method clone not permitted because
>>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>>  class $iwC where the access take place
>>                 val clonedDatum = x._1.datum().clone()
>> *****************
>>
>> So, how can I clone the datum?
>>
>> Seems I'm not the only one who ran into this problem:
>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>> can't figure out how to fix it in my case without hacking away like the
>> person in the linked PR did.
>>
>> Suggestions?
>>
>> --
>> Chris Miller
>>
>
>

Mime
View raw message