spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sadhan Sood <sadhan.s...@gmail.com>
Subject Re: SparkSQL exception on cached parquet table
Date Thu, 20 Nov 2014 13:54:07 GMT
So, I am seeing this issue with spark sql throwing an exception when trying
to read selective columns from a thrift parquet file and also when caching
them:
On some further digging, I was able to narrow it down to at-least one
particular column type: map<string, set<string>> to be causing this issue.
To reproduce this I created a test thrift file with a very basic schema and
stored some sample data in a parquet file:

Test.thrift
===========
typedef binary SomeId

enum SomeExclusionCause {
  WHITELIST = 1,
  HAS_PURCHASE = 2,
}

struct SampleThriftObject {
  10: string col_a;
  20: string col_b;
  30: string col_c;
  40: optional map<SomeExclusionCause, set<SomeId>> col_d;
}
=============

And loading the data in spark through schemaRDD:

import org.apache.spark.sql.SchemaRDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val parquetFile = "/path/to/generated/parquet/file"
val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.printSchema
root
 |-- col_a: string (nullable = true)
 |-- col_b: string (nullable = true)
 |-- col_c: string (nullable = true)
 |-- col_d: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: string (containsNull = false)

parquetFileRDD.registerTempTable("test")
sqlContext.cacheTable("test")
sqlContext.sql("select col_a from test").collect() <-- see the exception
stack here

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
at
parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
... 27 more

If you take out the col_d from the thrift file, the problem goes away. The
problem also shows up when trying to read the particular column without
caching the table first. The same file can be dumped/read using
parquet-tools just fine. Here is the file dump using parquet-tools:

row group 0
--------------------------------------------------------------------------------
col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
col_d:
.map:
..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
..value:
...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...

    col_a TV=9 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

    col_b TV=9 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

    col_c TV=9 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

    col_d.map.key TV=9 RL=1 DL=2
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9

    col_d.map.value.value_tuple TV=9 RL=2 DL=4
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9

BINARY col_a
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:a1
value 2: R:1 D:1 V:a2
value 3: R:1 D:1 V:a3
value 4: R:1 D:1 V:a4
value 5: R:1 D:1 V:a5
value 6: R:1 D:1 V:a6
value 7: R:1 D:1 V:a7
value 8: R:1 D:1 V:a8
value 9: R:1 D:1 V:a9

BINARY col_b
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:b1
value 2: R:1 D:1 V:b2
value 3: R:1 D:1 V:b3
value 4: R:1 D:1 V:b4
value 5: R:1 D:1 V:b5
value 6: R:1 D:1 V:b6
value 7: R:1 D:1 V:b7
value 8: R:1 D:1 V:b8
value 9: R:1 D:1 V:b9

BINARY col_c
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:c1
value 2: R:1 D:1 V:c2
value 3: R:1 D:1 V:c3
value 4: R:1 D:1 V:c4
value 5: R:1 D:1 V:c5
value 6: R:1 D:1 V:c6
value 7: R:1 D:1 V:c7
value 8: R:1 D:1 V:c8
value 9: R:1 D:1 V:c9

BINARY col_d.map.key
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:0 D:0 V:<null>
value 2: R:0 D:0 V:<null>
value 3: R:0 D:0 V:<null>
value 4: R:0 D:0 V:<null>
value 5: R:0 D:0 V:<null>
value 6: R:0 D:0 V:<null>
value 7: R:0 D:0 V:<null>
value 8: R:0 D:0 V:<null>
value 9: R:0 D:0 V:<null>

BINARY col_d.map.value.value_tuple
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 9 ***
value 1: R:0 D:0 V:<null>
value 2: R:0 D:0 V:<null>
value 3: R:0 D:0 V:<null>
value 4: R:0 D:0 V:<null>
value 5: R:0 D:0 V:<null>
value 6: R:0 D:0 V:<null>
value 7: R:0 D:0 V:<null>
value 8: R:0 D:0 V:<null>
value 9: R:0 D:0 V:<null>


I am happy to provide more information but any help is appreciated.


On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sadhan.sood@gmail.com> wrote:

> Hi Cheng,
>
> I tried reading the parquet file(on which we were getting the exception)
> through parquet-tools and it is able to dump the file and I can read the
> metadata, etc. I also loaded the file through hive table and can run a
> table scan query on it as well. Let me know if I can do more to help
> resolve the problem, I'll run it through a debugger and see if I can get
> more information on it in the meantime.
>
> Thanks,
> Sadhan
>
> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <lian.cs.zju@gmail.com> wrote:
>
>>  (Forgot to cc user mail list)
>>
>>
>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>
>> Hey Sadhan,
>>
>>  Thanks for the additional information, this is helpful. Seems that some
>> Parquet internal contract was broken, but I'm not sure whether it's caused
>> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
>> somehow. I'm investigating this. In the meanwhile, would you mind to help
>> to narrow down the problem by trying to scan exactly the same Parquet file
>> with some other systems (e.g. Hive or Impala)? If other systems work, then
>> there must be something wrong with Spark SQL.
>>
>>  Cheng
>>
>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sadhan.sood@gmail.com>
>> wrote:
>>
>>> Hi Cheng,
>>>
>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>
>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>         ... 26 more
>>>
>>>
>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <lian.cs.zju@gmail.com>
>>> wrote:
>>>
>>>>  Hi Sadhan,
>>>>
>>>> Could you please provide the stack trace of the
>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>> query succeeds is that Spark SQL doesn’t bother reading all data from the
>>>> table to give COUNT(*). In the second case, however, the whole table
>>>> is asked to be cached lazily via the cacheTable call, thus it’s
>>>> scanned to build the in-memory columnar cache. Then thing went wrong while
>>>> scanning this LZO compressed Parquet file. But unfortunately the stack
>>>> trace at hand doesn’t indicate the root cause.
>>>>
>>>> Cheng
>>>>
>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>
>>>> While testing SparkSQL on a bunch of parquet files (basically used to
>>>> be a partition for one of our hive tables), I encountered this error:
>>>>
>>>>  import org.apache.spark.sql.SchemaRDD
>>>> import org.apache.hadoop.fs.FileSystem;
>>>> import org.apache.hadoop.conf.Configuration;
>>>> import org.apache.hadoop.fs.Path;
>>>>
>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>
>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>> works fine
>>>> sqlContext.cacheTable("xyz_20141109")
>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>> fails with an exception
>>>>
>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>> block -1 in file
>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>
>>>>         at
>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>
>>>>         at
>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>
>>>>         at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>
>>>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>
>>>>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>
>>>>         at
>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>
>>>>         at
>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>
>>>>         at
>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>
>>>>   ​
>>>>
>>>
>>>
>>
>>
>

Mime
View raw message