spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: AvroFiles
Date Thu, 07 May 2015 05:48:59 GMT
Hello,
This is how i read Avro data.

import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.Schema
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapreduce.AvroKeyInputFormat

-- Read
def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date,
endDate: Date) = {
    val path = getInputPaths(inputDir, startDate, endDate)
    sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]]("file.avro")
}

-- Write
protected def writeOutputRecords(detailRecords:
RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
    val writeJob = new Job()
     // Schema of data you need to write.
    val schema = SchemaUtil.viewItemOutputSchema
    AvroJob.setOutputKeySchema(writeJob, schema)
    detailRecords.saveAsNewAPIHadoopFile(outputDir,
      classOf[AvroKey[GenericRecord]],
      classOf[org.apache.hadoop.io.NullWritable],
      classOf[AvroKeyOutputFormat[GenericRecord]],
      writeJob.getConfiguration)
}


<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
<scope>provided</scope>
</dependency>

It works. I do not see NPE.

On Wed, May 6, 2015 at 7:50 AM, Pankaj Deshpande <ppankd@gmail.com> wrote:

> I am not using kyro. I was using the regular sqlcontext.avrofiles to open.
> The files loads properly with the schema. Exception happens when I try to
> read it. Will try  kyro serializer and see if that helps.
> On May 5, 2015 9:02 PM, "Todd Nist" <tsindotg@gmail.com> wrote:
>
>> Are you using Kryo or Java serialization? I found this post useful:
>>
>>
>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>
>> If using kryo, you need to register the classes with kryo, something like
>> this:
>>
>>
>>     sc.registerKryoClasses(Array(
>>         classOf[ConfigurationProperty],
>>        classOf[Event]
>>     ))
>>
>> Or create a registrator something like this:
>>
>> class ODSKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.register(classOf[ConfigurationProperty], new AvroSerializer[ConfigurationProperty]())
>>     kryo.register(classOf[Event], new AvroSerializer[Event]()))
>>   }
>>
>> I encountered a similar error since several of the Avor core classes are
>> not marked Serializable.
>>
>> HTH.
>>
>> Todd
>>
>> On Tue, May 5, 2015 at 7:09 PM, Pankaj Deshpande <ppankd@gmail.com>
>> wrote:
>>
>>> Hi I am using Spark 1.3.1 to read an avro file stored on HDFS. The avro
>>> file was created using Avro 1.7.7. Similar to the example mentioned in
>>> http://www.infoobjects.com/spark-with-avro/
>>> I am getting a nullPointerException on Schema read. It could be a avro
>>> version mismatch. Has anybody had a similar issue with avro.
>>>
>>>
>>> Thanks
>>>
>>
>>


-- 
Deepak

Mime
View raw message