spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ayoub <benali.ayoub.i...@gmail.com>
Subject Re: [hive context] Unable to query array once saved as parquet
Date Mon, 02 Feb 2015 15:06:39 GMT
Hi,

given the current open issue:
https://issues.apache.org/jira/browse/SPARK-5508 I cannot use HiveQL to
insert schemaRDD data into a table if one of the columns is an Array of
Struct.

using the spark API, Is it possible to insert schema RDD into an existing
and *partitioned* table ?
the method "insertInto" on schema RDD does take only the name of the table.

Thanks,
Ayoub.

2015-01-31 22:30 GMT+01:00 Ayoub Benali <benali.ayoub.info@gmail.com>:

> Hello,
>
> as asked, I just filled this JIRA issue
> <https://issues.apache.org/jira/browse/SPARK-5508>.
>
> I will add an other similar code example which lead to "GenericRow cannot
> be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow"
> Exception.
>
> Best,
> Ayoub.
>
>
> 2015-01-31 4:05 GMT+01:00 Cheng Lian <lian.cs.zju@gmail.com>:
>
>>  According to the Gist Ayoub provided, the schema is fine. I reproduced
>> this issue locally, it should be bug, but I don't think it's related to
>> SPARK-5236. Will investigate this soon.
>>
>> Ayoub - would you mind to help to file a JIRA for this issue? Thanks!
>>
>> Cheng
>>
>> On 1/30/15 11:28 AM, Michael Armbrust wrote:
>>
>> Is it possible that your schema contains duplicate columns or column with
>> spaces in the name?  The parquet library will often give confusing error
>> messages in this case.
>>
>> On Fri, Jan 30, 2015 at 10:33 AM, Ayoub <benali.ayoub.info@gmail.com>
>> wrote:
>>
>>>  Hello,
>>>
>>> I have a problem when querying, with a hive context on spark
>>> 1.2.1-snapshot, a column in my table which is nested data structure like an
>>> array of struct.
>>> The problems happens only on the table stored as parquet, while querying
>>> the Schema RDD saved, as a temporary table, don't lead to any exception.
>>>
>>> my steps are:
>>> 1) reading JSON file
>>> 2) creating a schema RDD and saving it as a tmp table
>>> 3) creating an external table in hive meta store saved as parquet file
>>> 4) inserting the data from the tmp table to the persisted table
>>> 5) queering the persisted table lead to this exception:
>>>
>>> "select data.field1 from persisted_table LATERAL VIEW
>>> explode(data_array) nestedStuff AS data"
>>>
>>> parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
>>> in file hdfs://***/test_table/part-00001
>>>     at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>     at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>     at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>     at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>     at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>     at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>     at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>     at scala.collection.TraversableOnce$class.to
>>> (TraversableOnce.scala:273)
>>>     at <http://scala.collection.AbstractIterator.to>
>>> scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>     at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>     at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>>>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>>>     at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>>>     at java.util.ArrayList.get(ArrayList.java:411)
>>>     at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
>>>     at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
>>>     at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
>>>     at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99)
>>>     at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94)
>>>     at
>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:274)
>>>     at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>     at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>     at
>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>     at
>>> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>     at
>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>     at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>     ... 28 more
>>>
>>> Driver stacktrace:
>>>     at <http://org.apache.spark.scheduler.DAGScheduler.org>
>>> org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>>     at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>     at scala.Option.foreach(Option.scala:236)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>>     at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>     at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>     at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> The full code leading to this issue is available here: gist
>>> <https://gist.github.com/ayoub-benali/54d6f3b8635530e4e936>
>>>
>>> Could the problem comes from the way I insert the data into the table ?
>>>
>>> Is this problem related to this JIRA ticket
>>> https://issues.apache.org/jira/browse/SPARK-5236 ?
>>>
>>> Because I got a similar exception "GenericRow cannot be cast to
>>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow" With an other
>>> table that contains also a array of struct.
>>>
>>> Thanks,
>>> Ayoub.
>>>
>>> ------------------------------
>>> View this message in context: [hive context] Unable to query array once
>>> saved as parquet
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/hive-context-Unable-to-query-array-once-saved-as-parquet-tp21446.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-hive-context-Unable-to-query-array-once-saved-as-parquet-tp21466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Mime
View raw message