spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Miller <cmiller11...@gmail.com>
Subject Re: Avro SerDe Issue w/ Manual Partitions?
Date Mon, 07 Mar 2016 02:50:20 GMT
For anyone running into this same issue, it looks like Avro deserialization
is just broken when used with SparkSQL and partitioned schemas. I created
an bug report with details and a simplified example on how to reproduce:
https://issues.apache.org/jira/browse/SPARK-13709


--
Chris Miller

On Fri, Mar 4, 2016 at 12:11 AM, Chris Miller <cmiller11101@gmail.com>
wrote:

> One more thing -- just to set aside any question about my specific schema
> or data, I used the sample schema and data record from Oracle's
> documentation on Avro support. It's a pretty simple schema:
> https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html
>
> When I create a table with this schema and then try to query the
> Avro-encoded record, I get the same type of error:
>
> ********************
>  org.apache.avro.AvroTypeException: Found avro.FullName, expecting union
>     at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>     at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
>     at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>     at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>     at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
>     at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
>     at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
>     at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
>     at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>     at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>     at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>     at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>     at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>     at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>     at org.apache.spark.scheduler.Task.run(Task.scala:89)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> ********************
>
> To me, this "feels" like a bug -- I just can't identify if it's a Spark
> issue or an Avro issue. Decoding the same files work fine with Hive, and I
> imagine the same deserializer code is used there too.
>
> Thoughts?
>
> --
> Chris Miller
>
> On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.berman@gmail.com> wrote:
>
>> your field name is
>> *enum1_values*
>>
>> but you have data
>> { "foo1": "test123", *"enum1"*: "BLUE" }
>>
>> i.e. since you defined enum and not union(null, enum)
>> it tries to find value for enum1_values and doesn't find one...
>>
>> On 3 March 2016 at 11:30, Chris Miller <cmiller11101@gmail.com> wrote:
>>
>>> I've been digging into this a little deeper. Here's what I've found:
>>>
>>> test1.avsc:
>>> ********************
>>> {
>>>   "namespace": "com.cmiller",
>>>   "name": "test1",
>>>   "type": "record",
>>>   "fields": [
>>>     { "name":"foo1", "type":"string" }
>>>   ]
>>> }
>>> ********************
>>>
>>> test2.avsc:
>>> ********************
>>> {
>>>   "namespace": "com.cmiller",
>>>   "name": "test1",
>>>   "type": "record",
>>>   "fields": [
>>>     { "name":"foo1", "type":"string" },
>>>     { "name":"enum1", "type": { "type":"enum", "name":"enum1_values",
>>> "symbols":["BLUE","RED", "GREEN"]} }
>>>   ]
>>> }
>>> ********************
>>>
>>> test1.json (encoded and saved to test/test1.avro):
>>> ********************
>>> { "foo1": "test123" }
>>> ********************
>>>
>>> test2.json (encoded and saved to test/test1.avro):
>>> ********************
>>> { "foo1": "test123", "enum1": "BLUE" }
>>> ********************
>>>
>>> Here is how I create the tables and add the data:
>>>
>>> ********************
>>> CREATE TABLE test1
>>> PARTITIONED BY (ds STRING)
>>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>>> STORED AS INPUTFORMAT
>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>>> OUTPUTFORMAT
>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>>> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test1.avsc');
>>>
>>> ALTER TABLE test1 ADD PARTITION (ds='1') LOCATION
>>> 's3://spark-data/dev/test1';
>>>
>>>
>>> CREATE TABLE test2
>>> PARTITIONED BY (ds STRING)
>>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>>> STORED AS INPUTFORMAT
>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>>> OUTPUTFORMAT
>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>>> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test2.avsc');
>>>
>>> ALTER TABLE test2 ADD PARTITION (ds='1') LOCATION
>>> 's3://spark-data/dev/test2';
>>> ********************
>>>
>>> And here's what I get:
>>>
>>> ********************
>>> SELECT * FROM test1;
>>> -- works fine, shows data
>>>
>>> SELECT * FROM test2;
>>>
>>> org.apache.avro.AvroTypeException: Found com.cmiller.enum1_values,
>>> expecting union
>>>     at
>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
>>>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>     at
>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
>>>     at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>>>     at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>>>     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>>>     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>>>     at
>>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
>>>     at
>>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
>>>     at
>>> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
>>>     at
>>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
>>>     at
>>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
>>>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>     at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>     at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>     at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>     at scala.collection.TraversableOnce$class.to
>>> (TraversableOnce.scala:273)
>>>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>     at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>     at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>     at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>>>     at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> ********************
>>>
>>> In addition to the above, I also tried putting the test Avro files on
>>> HDFS instead of S3 -- the error is the same. I also tried querying from
>>> Scala instead of using Zeppelin, and I get the same error.
>>>
>>> Where should I begin with troubleshooting this problem? This same query
>>> runs fine on Hive. Based on the error, it appears to be something in the
>>> deserializer though... but if it were a bug in the Avro deserializer, why
>>> does it only appear with Spark? I imagine Hive queries would be using the
>>> same deserializer, no?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> Chris Miller
>>>
>>> On Thu, Mar 3, 2016 at 4:33 AM, Chris Miller <cmiller11101@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a strange issue occurring when I use manual partitions.
>>>>
>>>> If I create a table as follows, I am able to query the data with no
>>>> problem:
>>>>
>>>> ********
>>>> CREATE TABLE test1
>>>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>>>> STORED AS INPUTFORMAT
>>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>>>> OUTPUTFORMAT
>>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>>>> LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/'
>>>> TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');
>>>> ********
>>>>
>>>> If I create the table like this, however, and then add a partition with
>>>> a LOCATION specified, I am unable to query:
>>>>
>>>> ********
>>>> CREATE TABLE test2
>>>> PARTITIONED BY (ds STRING)
>>>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>>>> STORED AS INPUTFORMAT
>>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>>>> OUTPUTFORMAT
>>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>>>> TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');
>>>>
>>>> ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION
>>>> 's3://analytics-bucket/prod/logs/avro/2016/03/02/';
>>>> ********
>>>>
>>>> This is what happens
>>>>
>>>> ********
>>>> SELECT * FROM test2 LIMIT 1;
>>>>
>>>> org.apache.avro.AvroTypeException: Found ActionEnum, expecting union
>>>>     at
>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
>>>>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>>     at
>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>>     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
>>>>     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>>>>     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>>>>     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>>>>     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>>>>     at
>>>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
>>>>     at
>>>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
>>>>     at
>>>> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
>>>>     at
>>>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
>>>>     at
>>>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
>>>>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>     at
>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>     at
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>     at
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>     at scala.collection.TraversableOnce$class.to
>>>> (TraversableOnce.scala:273)
>>>>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>     at
>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>     at
>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>     at
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>>>>     at
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>>>>     at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>     at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>     at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>     at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> ********
>>>>
>>>> The data is exactly the same, and I can still go back and query the
>>>> test1 table without issue. I don't have control over the directory
>>>> structure, so I need to add the partitions manually so that I can specify
a
>>>> location.
>>>>
>>>> For what it's worth, "ActionEnum" is the first field in my schema. This
>>>> same table and query structure works fine with Hive. When I try to run this
>>>> with SparkSQL, however, I get the above error.
>>>>
>>>> Anyone have any idea what the problem is here? Thanks!
>>>>
>>>> --
>>>> Chris Miller
>>>>
>>>
>>>
>>
>

Mime
View raw message