spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: SparkSQL exception on cached parquet table
Date Thu, 20 Nov 2014 18:37:53 GMT
Which version are you running on again?

On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sadhan.sood@gmail.com> wrote:

> Also attaching the parquet file if anyone wants to take a further look.
>
> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sadhan.sood@gmail.com>
> wrote:
>
>> So, I am seeing this issue with spark sql throwing an exception when
>> trying to read selective columns from a thrift parquet file and also when
>> caching them:
>> On some further digging, I was able to narrow it down to at-least one
>> particular column type: map<string, set<string>> to be causing this issue.
>> To reproduce this I created a test thrift file with a very basic schema and
>> stored some sample data in a parquet file:
>>
>> Test.thrift
>> ===========
>> typedef binary SomeId
>>
>> enum SomeExclusionCause {
>>   WHITELIST = 1,
>>   HAS_PURCHASE = 2,
>> }
>>
>> struct SampleThriftObject {
>>   10: string col_a;
>>   20: string col_b;
>>   30: string col_c;
>>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
>> }
>> =============
>>
>> And loading the data in spark through schemaRDD:
>>
>> import org.apache.spark.sql.SchemaRDD
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>> val parquetFile = "/path/to/generated/parquet/file"
>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>> parquetFileRDD.printSchema
>> root
>>  |-- col_a: string (nullable = true)
>>  |-- col_b: string (nullable = true)
>>  |-- col_c: string (nullable = true)
>>  |-- col_d: map (nullable = true)
>>  |    |-- key: string
>>  |    |-- value: array (valueContainsNull = true)
>>  |    |    |-- element: string (containsNull = false)
>>
>> parquetFileRDD.registerTempTable("test")
>> sqlContext.cacheTable("test")
>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>> stack here
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
>> at
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>> at
>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>> at
>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>> at
>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>> at
>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>> at
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>> ... 27 more
>>
>> If you take out the col_d from the thrift file, the problem goes away.
>> The problem also shows up when trying to read the particular column without
>> caching the table first. The same file can be dumped/read using
>> parquet-tools just fine. Here is the file dump using parquet-tools:
>>
>> row group 0
>> --------------------------------------------------------------------------------
>> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
>> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
>> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
>> col_d:
>> .map:
>> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
>> ..value:
>> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>>
>>     col_a TV=9 RL=0 DL=1
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>
>>     col_b TV=9 RL=0 DL=1
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>
>>     col_c TV=9 RL=0 DL=1
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>
>>     col_d.map.key TV=9 RL=1 DL=2
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>
>>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>>     ----------------------------------------------------------------------------
>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>
>> BINARY col_a
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:1 D:1 V:a1
>> value 2: R:1 D:1 V:a2
>> value 3: R:1 D:1 V:a3
>> value 4: R:1 D:1 V:a4
>> value 5: R:1 D:1 V:a5
>> value 6: R:1 D:1 V:a6
>> value 7: R:1 D:1 V:a7
>> value 8: R:1 D:1 V:a8
>> value 9: R:1 D:1 V:a9
>>
>> BINARY col_b
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:1 D:1 V:b1
>> value 2: R:1 D:1 V:b2
>> value 3: R:1 D:1 V:b3
>> value 4: R:1 D:1 V:b4
>> value 5: R:1 D:1 V:b5
>> value 6: R:1 D:1 V:b6
>> value 7: R:1 D:1 V:b7
>> value 8: R:1 D:1 V:b8
>> value 9: R:1 D:1 V:b9
>>
>> BINARY col_c
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:1 D:1 V:c1
>> value 2: R:1 D:1 V:c2
>> value 3: R:1 D:1 V:c3
>> value 4: R:1 D:1 V:c4
>> value 5: R:1 D:1 V:c5
>> value 6: R:1 D:1 V:c6
>> value 7: R:1 D:1 V:c7
>> value 8: R:1 D:1 V:c8
>> value 9: R:1 D:1 V:c9
>>
>> BINARY col_d.map.key
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:0 D:0 V:<null>
>> value 2: R:0 D:0 V:<null>
>> value 3: R:0 D:0 V:<null>
>> value 4: R:0 D:0 V:<null>
>> value 5: R:0 D:0 V:<null>
>> value 6: R:0 D:0 V:<null>
>> value 7: R:0 D:0 V:<null>
>> value 8: R:0 D:0 V:<null>
>> value 9: R:0 D:0 V:<null>
>>
>> BINARY col_d.map.value.value_tuple
>> --------------------------------------------------------------------------------
>> *** row group 1 of 1, values 1 to 9 ***
>> value 1: R:0 D:0 V:<null>
>> value 2: R:0 D:0 V:<null>
>> value 3: R:0 D:0 V:<null>
>> value 4: R:0 D:0 V:<null>
>> value 5: R:0 D:0 V:<null>
>> value 6: R:0 D:0 V:<null>
>> value 7: R:0 D:0 V:<null>
>> value 8: R:0 D:0 V:<null>
>> value 9: R:0 D:0 V:<null>
>>
>>
>> I am happy to provide more information but any help is appreciated.
>>
>>
>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sadhan.sood@gmail.com>
>> wrote:
>>
>>> Hi Cheng,
>>>
>>> I tried reading the parquet file(on which we were getting the exception)
>>> through parquet-tools and it is able to dump the file and I can read the
>>> metadata, etc. I also loaded the file through hive table and can run a
>>> table scan query on it as well. Let me know if I can do more to help
>>> resolve the problem, I'll run it through a debugger and see if I can get
>>> more information on it in the meantime.
>>>
>>> Thanks,
>>> Sadhan
>>>
>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <lian.cs.zju@gmail.com>
>>> wrote:
>>>
>>>>  (Forgot to cc user mail list)
>>>>
>>>>
>>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>>
>>>> Hey Sadhan,
>>>>
>>>>  Thanks for the additional information, this is helpful. Seems that
>>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself was
>>>> damaged somehow. I'm investigating this. In the meanwhile, would you mind
>>>> to help to narrow down the problem by trying to scan exactly the same
>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>>> systems work, then there must be something wrong with Spark SQL.
>>>>
>>>>  Cheng
>>>>
>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sadhan.sood@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Cheng,
>>>>>
>>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>>
>>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>>         ... 26 more
>>>>>
>>>>>
>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <lian.cs.zju@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>  Hi Sadhan,
>>>>>>
>>>>>> Could you please provide the stack trace of the
>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>>> query succeeds is that Spark SQL doesn’t bother reading all data
from the
>>>>>> table to give COUNT(*). In the second case, however, the whole table
>>>>>> is asked to be cached lazily via the cacheTable call, thus it’s
>>>>>> scanned to build the in-memory columnar cache. Then thing went wrong
while
>>>>>> scanning this LZO compressed Parquet file. But unfortunately the
stack
>>>>>> trace at hand doesn’t indicate the root cause.
>>>>>>
>>>>>> Cheng
>>>>>>
>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>>
>>>>>> While testing SparkSQL on a bunch of parquet files (basically used
to
>>>>>> be a partition for one of our hive tables), I encountered this error:
>>>>>>
>>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>
>>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>>
>>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>> works fine
>>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <--
>>>>>> fails with an exception
>>>>>>
>>>>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in
>>>>>> block -1 in file
>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>>
>>>>>>         at
>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>>
>>>>>>         at
>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>
>>>>>>         at
>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>>
>>>>>>         at
>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>>
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>
>>>>>>   ​
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

Mime
View raw message