spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sadhan Sood <sadhan.s...@gmail.com>
Subject Re: SparkSQL exception on cached parquet table
Date Thu, 20 Nov 2014 16:17:44 GMT
Also attaching the parquet file if anyone wants to take a further look.

On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sadhan.sood@gmail.com> wrote:

> So, I am seeing this issue with spark sql throwing an exception when
> trying to read selective columns from a thrift parquet file and also when
> caching them:
> On some further digging, I was able to narrow it down to at-least one
> particular column type: map<string, set<string>> to be causing this issue.
> To reproduce this I created a test thrift file with a very basic schema and
> stored some sample data in a parquet file:
>
> Test.thrift
> ===========
> typedef binary SomeId
>
> enum SomeExclusionCause {
>   WHITELIST = 1,
>   HAS_PURCHASE = 2,
> }
>
> struct SampleThriftObject {
>   10: string col_a;
>   20: string col_b;
>   30: string col_c;
>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
> }
> =============
>
> And loading the data in spark through schemaRDD:
>
> import org.apache.spark.sql.SchemaRDD
> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
> val parquetFile = "/path/to/generated/parquet/file"
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.printSchema
> root
>  |-- col_a: string (nullable = true)
>  |-- col_b: string (nullable = true)
>  |-- col_c: string (nullable = true)
>  |-- col_d: map (nullable = true)
>  |    |-- key: string
>  |    |-- value: array (valueContainsNull = true)
>  |    |    |-- element: string (containsNull = false)
>
> parquetFileRDD.registerTempTable("test")
> sqlContext.cacheTable("test")
> sqlContext.sql("select col_a from test").collect() <-- see the exception
> stack here
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
> at
> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
> at
> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
> at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
> ... 27 more
>
> If you take out the col_d from the thrift file, the problem goes away. The
> problem also shows up when trying to read the particular column without
> caching the table first. The same file can be dumped/read using
> parquet-tools just fine. Here is the file dump using parquet-tools:
>
> row group 0
> --------------------------------------------------------------------------------
> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
> col_d:
> .map:
> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
> ..value:
> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>
>     col_a TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>
>     col_b TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>
>     col_c TV=9 RL=0 DL=1
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>
>     col_d.map.key TV=9 RL=1 DL=2
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>
>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>     ----------------------------------------------------------------------------
>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>
> BINARY col_a
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:1 D:1 V:a1
> value 2: R:1 D:1 V:a2
> value 3: R:1 D:1 V:a3
> value 4: R:1 D:1 V:a4
> value 5: R:1 D:1 V:a5
> value 6: R:1 D:1 V:a6
> value 7: R:1 D:1 V:a7
> value 8: R:1 D:1 V:a8
> value 9: R:1 D:1 V:a9
>
> BINARY col_b
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:1 D:1 V:b1
> value 2: R:1 D:1 V:b2
> value 3: R:1 D:1 V:b3
> value 4: R:1 D:1 V:b4
> value 5: R:1 D:1 V:b5
> value 6: R:1 D:1 V:b6
> value 7: R:1 D:1 V:b7
> value 8: R:1 D:1 V:b8
> value 9: R:1 D:1 V:b9
>
> BINARY col_c
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:1 D:1 V:c1
> value 2: R:1 D:1 V:c2
> value 3: R:1 D:1 V:c3
> value 4: R:1 D:1 V:c4
> value 5: R:1 D:1 V:c5
> value 6: R:1 D:1 V:c6
> value 7: R:1 D:1 V:c7
> value 8: R:1 D:1 V:c8
> value 9: R:1 D:1 V:c9
>
> BINARY col_d.map.key
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:0 D:0 V:<null>
> value 2: R:0 D:0 V:<null>
> value 3: R:0 D:0 V:<null>
> value 4: R:0 D:0 V:<null>
> value 5: R:0 D:0 V:<null>
> value 6: R:0 D:0 V:<null>
> value 7: R:0 D:0 V:<null>
> value 8: R:0 D:0 V:<null>
> value 9: R:0 D:0 V:<null>
>
> BINARY col_d.map.value.value_tuple
> --------------------------------------------------------------------------------
> *** row group 1 of 1, values 1 to 9 ***
> value 1: R:0 D:0 V:<null>
> value 2: R:0 D:0 V:<null>
> value 3: R:0 D:0 V:<null>
> value 4: R:0 D:0 V:<null>
> value 5: R:0 D:0 V:<null>
> value 6: R:0 D:0 V:<null>
> value 7: R:0 D:0 V:<null>
> value 8: R:0 D:0 V:<null>
> value 9: R:0 D:0 V:<null>
>
>
> I am happy to provide more information but any help is appreciated.
>
>
> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sadhan.sood@gmail.com>
> wrote:
>
>> Hi Cheng,
>>
>> I tried reading the parquet file(on which we were getting the exception)
>> through parquet-tools and it is able to dump the file and I can read the
>> metadata, etc. I also loaded the file through hive table and can run a
>> table scan query on it as well. Let me know if I can do more to help
>> resolve the problem, I'll run it through a debugger and see if I can get
>> more information on it in the meantime.
>>
>> Thanks,
>> Sadhan
>>
>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <lian.cs.zju@gmail.com>
>> wrote:
>>
>>>  (Forgot to cc user mail list)
>>>
>>>
>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>
>>> Hey Sadhan,
>>>
>>>  Thanks for the additional information, this is helpful. Seems that
>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>> to help to narrow down the problem by trying to scan exactly the same
>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>> systems work, then there must be something wrong with Spark SQL.
>>>
>>>  Cheng
>>>
>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sadhan.sood@gmail.com>
>>> wrote:
>>>
>>>> Hi Cheng,
>>>>
>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>
>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>         ... 26 more
>>>>
>>>>
>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <lian.cs.zju@gmail.com>
>>>> wrote:
>>>>
>>>>>  Hi Sadhan,
>>>>>
>>>>> Could you please provide the stack trace of the
>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>> query succeeds is that Spark SQL doesn’t bother reading all data from
the
>>>>> table to give COUNT(*). In the second case, however, the whole table
>>>>> is asked to be cached lazily via the cacheTable call, thus it’s
>>>>> scanned to build the in-memory columnar cache. Then thing went wrong
while
>>>>> scanning this LZO compressed Parquet file. But unfortunately the stack
>>>>> trace at hand doesn’t indicate the root cause.
>>>>>
>>>>> Cheng
>>>>>
>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>
>>>>> While testing SparkSQL on a bunch of parquet files (basically used to
>>>>> be a partition for one of our hive tables), I encountered this error:
>>>>>
>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>> import org.apache.hadoop.conf.Configuration;
>>>>> import org.apache.hadoop.fs.Path;
>>>>>
>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>
>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>> works fine
>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>> fails with an exception
>>>>>
>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>> block -1 in file
>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>
>>>>>         at
>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>
>>>>>         at
>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>
>>>>>         at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>
>>>>>         at
>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>
>>>>>         at
>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>
>>>>>         at
>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>
>>>>>         at
>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>>   ​
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message