spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Albert <m_albert...@yahoo.com.INVALID>
Subject avro + parquet + vector<string> + NullPointerException while reading
Date Tue, 04 Nov 2014 03:33:53 GMT

Greetings!




I'm trying to use avro and parquet with the following schema:

{

    "name": "TestStruct",

    "namespace": "bughunt",

    "type": "record",

    "fields": [

        {

            "name": "string_array",

            "type": { "type": "array", "items": "string" } 

        }

    ]



}
The writing process seems to be OK, but when I try to read it with Spark, I get:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

string_array (bughunt.TestStruct)

 at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
When I try to read it with Hive, I get this:
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritable
Which would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 ,
but that one seems to be Hive specific, and I am not seeing Spark read the data it claims
to have written itself.
I'm running on an Amazon EMR cluster using the "version 2.4.0" hadoop code and spark 1.1.0.Has
anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:
package bughunt




import org.apache.hadoop.mapreduce.Job




import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._







import parquet.avro.AvroWriteSupport

import parquet.avro.AvroParquetOutputFormat

import parquet.hadoop.ParquetOutputFormat




import java.util.ArrayList







object GenData {

    val outputPath = "/user/xxxxx/testdata"

    val words = List( 

                    List("apple", "banana", "cherry"),

                    List("car", "boat", "plane"),

                    List("lion", "tiger", "bear"),

                    List("north", "south", "east", "west"),

                    List("up", "down", "left", "right"),

                    List("red", "green", "blue"))




    def main(args: Array[String]) {

        val conf = new SparkConf(true)

                    .setAppName("IngestLoanApplicattion")

                    //.set("spark.kryo.registrator",

                    //            classOf[CommonRegistrator].getName)

                    .set("spark.serializer",

                            "org.apache.spark.serializer.KryoSerializer")

                    .set("spark.kryoserializer.buffer.mb", 4.toString)

                    .set("spark.kryo.referenceTracking", "false")




        val sc = new SparkContext(conf)




        val rdd = sc.parallelize(words)




        val job = new Job(sc.hadoopConfiguration)




        ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])

        AvroParquetOutputFormat.setSchema(job,

                    TestStruct.SCHEMA$)




        rdd.map(p => { 

                    val xs = new java.util.ArrayList[String]

                    for (z<-p) { xs.add(z) }

                    val bldr = TestStruct.newBuilder()

                    bldr.setStringArray(xs)

                    (null, bldr.build()) })

           .saveAsNewAPIHadoopFile(outputPath,

                classOf[Void],

                classOf[TestStruct],

                classOf[ParquetOutputFormat[TestStruct]],

                job.getConfiguration)

    }

}

To read the data, I use this sort of code from the spark-shell:
:paste




import bughunt.TestStruct




import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext




import parquet.hadoop.ParquetInputFormat

import parquet.avro.AvroReadSupport




def openRddSpecific(sc: SparkContext) = {

    val job = new Job(sc.hadoopConfiguration)




    ParquetInputFormat.setReadSupportClass(job,

            classOf[AvroReadSupport[TestStruct]])




    sc.newAPIHadoopFile("/user/malbert/testdata",

            classOf[ParquetInputFormat[TestStruct]],

            classOf[Void],

            classOf[TestStruct],

            job.getConfiguration)

}
I start the Spark shell as follows:
spark-shell \

    --jars ../my-jar-containing-the-class-definitions.jar \

    --conf mapreduce.user.classpath.first=true \

    --conf spark.kryo.referenceTracking=false \

    --conf spark.kryoserializer.buffer.mb=4 \

    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 

I'm stumped.  I can read and write records and maps, but arrays/vectors elude me.Am I missing
something obvious?
Thanks!
Sincerely, Mike Albert
Mime
View raw message