spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sadhan Sood <sadhan.s...@gmail.com>
Subject Re: SparkSQL exception on cached parquet table
Date Thu, 20 Nov 2014 18:39:45 GMT
I am running on master, pulled yesterday I believe but saw the same issue
with 1.2.0

On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust <michael@databricks.com>
wrote:

> Which version are you running on again?
>
> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood <sadhan.sood@gmail.com>
> wrote:
>
>> Also attaching the parquet file if anyone wants to take a further look.
>>
>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood <sadhan.sood@gmail.com>
>> wrote:
>>
>>> So, I am seeing this issue with spark sql throwing an exception when
>>> trying to read selective columns from a thrift parquet file and also when
>>> caching them:
>>> On some further digging, I was able to narrow it down to at-least one
>>> particular column type: map<string, set<string>> to be causing this
issue.
>>> To reproduce this I created a test thrift file with a very basic schema and
>>> stored some sample data in a parquet file:
>>>
>>> Test.thrift
>>> ===========
>>> typedef binary SomeId
>>>
>>> enum SomeExclusionCause {
>>>   WHITELIST = 1,
>>>   HAS_PURCHASE = 2,
>>> }
>>>
>>> struct SampleThriftObject {
>>>   10: string col_a;
>>>   20: string col_b;
>>>   30: string col_c;
>>>   40: optional map<SomeExclusionCause, set<SomeId>> col_d;
>>> }
>>> =============
>>>
>>> And loading the data in spark through schemaRDD:
>>>
>>> import org.apache.spark.sql.SchemaRDD
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>> val parquetFile = "/path/to/generated/parquet/file"
>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.printSchema
>>> root
>>>  |-- col_a: string (nullable = true)
>>>  |-- col_b: string (nullable = true)
>>>  |-- col_c: string (nullable = true)
>>>  |-- col_d: map (nullable = true)
>>>  |    |-- key: string
>>>  |    |-- value: array (valueContainsNull = true)
>>>  |    |    |-- element: string (containsNull = false)
>>>
>>> parquetFileRDD.registerTempTable("test")
>>> sqlContext.cacheTable("test")
>>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>>> stack here
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>>> value at 0 in block -1 in file file:/tmp/xyz/part-r-00000.parquet
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>> at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>> at java.util.ArrayList.get(ArrayList.java:431)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>> at
>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>> at
>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>> at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>> ... 27 more
>>>
>>> If you take out the col_d from the thrift file, the problem goes away.
>>> The problem also shows up when trying to read the particular column without
>>> caching the table first. The same file can be dumped/read using
>>> parquet-tools just fine. Here is the file dump using parquet-tools:
>>>
>>> row group 0
>>> --------------------------------------------------------------------------------
>>> col_a:           BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
>>> col_b:           BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
>>> col_c:           BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
>>> col_d:
>>> .map:
>>> ..key:           BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
>>> ..value:
>>> ...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...
>>>
>>>     col_a TV=9 RL=0 DL=1
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>
>>>     col_b TV=9 RL=0 DL=1
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>
>>>     col_c TV=9 RL=0 DL=1
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9
>>>
>>>     col_d.map.key TV=9 RL=1 DL=2
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>
>>>     col_d.map.value.value_tuple TV=9 RL=2 DL=4
>>>     ----------------------------------------------------------------------------
>>>     page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9
>>>
>>> BINARY col_a
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:1 D:1 V:a1
>>> value 2: R:1 D:1 V:a2
>>> value 3: R:1 D:1 V:a3
>>> value 4: R:1 D:1 V:a4
>>> value 5: R:1 D:1 V:a5
>>> value 6: R:1 D:1 V:a6
>>> value 7: R:1 D:1 V:a7
>>> value 8: R:1 D:1 V:a8
>>> value 9: R:1 D:1 V:a9
>>>
>>> BINARY col_b
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:1 D:1 V:b1
>>> value 2: R:1 D:1 V:b2
>>> value 3: R:1 D:1 V:b3
>>> value 4: R:1 D:1 V:b4
>>> value 5: R:1 D:1 V:b5
>>> value 6: R:1 D:1 V:b6
>>> value 7: R:1 D:1 V:b7
>>> value 8: R:1 D:1 V:b8
>>> value 9: R:1 D:1 V:b9
>>>
>>> BINARY col_c
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:1 D:1 V:c1
>>> value 2: R:1 D:1 V:c2
>>> value 3: R:1 D:1 V:c3
>>> value 4: R:1 D:1 V:c4
>>> value 5: R:1 D:1 V:c5
>>> value 6: R:1 D:1 V:c6
>>> value 7: R:1 D:1 V:c7
>>> value 8: R:1 D:1 V:c8
>>> value 9: R:1 D:1 V:c9
>>>
>>> BINARY col_d.map.key
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:0 D:0 V:<null>
>>> value 2: R:0 D:0 V:<null>
>>> value 3: R:0 D:0 V:<null>
>>> value 4: R:0 D:0 V:<null>
>>> value 5: R:0 D:0 V:<null>
>>> value 6: R:0 D:0 V:<null>
>>> value 7: R:0 D:0 V:<null>
>>> value 8: R:0 D:0 V:<null>
>>> value 9: R:0 D:0 V:<null>
>>>
>>> BINARY col_d.map.value.value_tuple
>>> --------------------------------------------------------------------------------
>>> *** row group 1 of 1, values 1 to 9 ***
>>> value 1: R:0 D:0 V:<null>
>>> value 2: R:0 D:0 V:<null>
>>> value 3: R:0 D:0 V:<null>
>>> value 4: R:0 D:0 V:<null>
>>> value 5: R:0 D:0 V:<null>
>>> value 6: R:0 D:0 V:<null>
>>> value 7: R:0 D:0 V:<null>
>>> value 8: R:0 D:0 V:<null>
>>> value 9: R:0 D:0 V:<null>
>>>
>>>
>>> I am happy to provide more information but any help is appreciated.
>>>
>>>
>>> On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood <sadhan.sood@gmail.com>
>>> wrote:
>>>
>>>> Hi Cheng,
>>>>
>>>> I tried reading the parquet file(on which we were getting the
>>>> exception) through parquet-tools and it is able to dump the file and I can
>>>> read the metadata, etc. I also loaded the file through hive table and can
>>>> run a table scan query on it as well. Let me know if I can do more to help
>>>> resolve the problem, I'll run it through a debugger and see if I can get
>>>> more information on it in the meantime.
>>>>
>>>> Thanks,
>>>> Sadhan
>>>>
>>>> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian <lian.cs.zju@gmail.com>
>>>> wrote:
>>>>
>>>>>  (Forgot to cc user mail list)
>>>>>
>>>>>
>>>>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>>>>
>>>>> Hey Sadhan,
>>>>>
>>>>>  Thanks for the additional information, this is helpful. Seems that
>>>>> some Parquet internal contract was broken, but I'm not sure whether it's
>>>>> caused by Spark SQL or Parquet, or even maybe the Parquet file itself
was
>>>>> damaged somehow. I'm investigating this. In the meanwhile, would you
mind
>>>>> to help to narrow down the problem by trying to scan exactly the same
>>>>> Parquet file with some other systems (e.g. Hive or Impala)? If other
>>>>> systems work, then there must be something wrong with Spark SQL.
>>>>>
>>>>>  Cheng
>>>>>
>>>>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood <sadhan.sood@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Cheng,
>>>>>>
>>>>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>>>>
>>>>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>>         at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>>         at java.util.ArrayList.get(ArrayList.java:431)
>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>>>         at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:282)
>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>>>         at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>>>         at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>>>         at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>>>         at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>>>         ... 26 more
>>>>>>
>>>>>>
>>>>>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian <lian.cs.zju@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>  Hi Sadhan,
>>>>>>>
>>>>>>> Could you please provide the stack trace of the
>>>>>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first
>>>>>>> query succeeds is that Spark SQL doesn’t bother reading all
data from the
>>>>>>> table to give COUNT(*). In the second case, however, the whole
>>>>>>> table is asked to be cached lazily via the cacheTable call, thus
>>>>>>> it’s scanned to build the in-memory columnar cache. Then thing
went wrong
>>>>>>> while scanning this LZO compressed Parquet file. But unfortunately
the
>>>>>>> stack trace at hand doesn’t indicate the root cause.
>>>>>>>
>>>>>>> Cheng
>>>>>>>
>>>>>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>>>>>
>>>>>>> While testing SparkSQL on a bunch of parquet files (basically
used
>>>>>>> to be a partition for one of our hive tables), I encountered
this error:
>>>>>>>
>>>>>>>  import org.apache.spark.sql.SchemaRDD
>>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>>
>>>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>>>
>>>>>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect()
<--
>>>>>>> works fine
>>>>>>> sqlContext.cacheTable("xyz_20141109")
>>>>>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect()
<--
>>>>>>> fails with an exception
>>>>>>>
>>>>>>>   parquet.io.ParquetDecodingException: Can not read value at
0 in
>>>>>>> block -1 in file
>>>>>>> hdfs://xxxxxxxx::9000/event_logs/xyz/20141109/part-00009359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-00009.lzo.parquet
>>>>>>>
>>>>>>>         at
>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>>>>
>>>>>>>         at
>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>
>>>>>>>         at
>>>>>>> scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>>>>>>
>>>>>>>         at
>>>>>>> scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>>>>>
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>>>>>
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>>>
>>>>>>>   ​
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>

Mime
View raw message