spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Raul Saez Tapia (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-14927) DataFrame. saveAsTable creates RDD partitions but not Hive partitions
Date Thu, 27 Oct 2016 06:51:58 GMT

    [ https://issues.apache.org/jira/browse/SPARK-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15610900#comment-15610900
] 

Raul Saez Tapia edited comment on SPARK-14927 at 10/27/16 6:51 AM:
-------------------------------------------------------------------

[~xwu0226] for me is working fine your example with Spark 1.6.1. However it is not working
when we use UDT.

My DataFrame shows:
{code:scala}
scala> model_date.toDF.show
+--------+--------------------+
|    date|               model|
+--------+--------------------+
|20160610|[aa.bb.spark.types.PersonWrapper@8542...|
|20160610|[aa.bb.spark.types.PersonWrapper@8831......|
...
...
+--------+--------------------+
{code}

I have created the table with some specific properties so I can say how is defined the table
and how to parse from PersonType UDT to table schema:
{code:sql}
create table model_orc (`model` struct<personWrapper:struct<id:int,name:string>>)
PARTITIONED BY (`date` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES ('path'='hdfs:///user/raulsaez/model_orc') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs:///user/raulsaez/model_orc'
TBLPROPERTIES('spark.sql.sources.schema.numParts'='1','spark.sql.sources.schema.part.0'='{
\"type\":\"struct\",\"fields\":[{ \"name\":\"personWrapper\",\"type\":{ \"type\":\"udt\",\"class\":\"aa.bb.spark.types.PersonType\",\"pyClass\":null,\"sqlType\":{
\"type\":\"struct\",\"fields\":[{ \"name\":\"id\",\"type\":   \"integer\",\"nullable\":true,\"metadata\":{}
} ,{ \"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{} }] } },\"nullable\":true,\"metadata\":{}
}] }')
{code}

Now we insert data into table:
{code:scala}
scala> hiveContext.sql("insert into model_orc partition(date=20160610) select model,date
from dfJune")
org.apache.spark.sql.AnalysisException: cannot resolve 'cast(model as struct<person:struct<personWrapper:struct<id:int,name:string>>>)'
due to data type mismatch: cannot cast
StructType(StructField(personWrapper,,true)),true)
to
StructType(StructField(person,StructType(StructField(id,IntegerType,true),StructField(name,StringType,true),),true)
{code}
I have the same issue with both Parquet and ORC.



And if I persist the DataFrame as a table with ORC:
{code:scala}
model_date.toDF.write.format("orc").partitionBy("date").saveAsTable("model_orc_asTable")
{code}

Or even if I persist it as a ORC file:
{code:scala}
scala> model_date.toDF.write.mode(SaveMode.Append).format("orc").partitionBy("date").save("model_orc")
{code}

I get the ClassCastException:
{code:scala}
Caused by: java.lang.ClassCastException: aa.bb.spark.types.PersonType cannot be cast to org.apache.spark.sql.types.StructType
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:557)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:590)
        at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:589)
        at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:589)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrapOrcStruct(OrcRelation.scala:128)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.writeInternal(OrcRelation.scala:139)
        at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:358)
        ... 8 more
{code}


If I persist the DataFrame as a table with Parquet:
{code:scala}
scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").saveAsTable("model_parquet_asTable")
16/10/27 09:39:24 WARN HiveContext$$anon$2: Persisting partitioned data source relation `model_parquet_asTable`
into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input
path(s):
hdfs://dev-nameservice/apps/hive/warehouse/model_parquet_astable
...
...
...
scala> hiveContext.sql("select * from model_parquet_asTable where date=20160610").show
+--------------------+--------+
|               model|    date|
+--------------------+--------+
|[aa.bb.spark.types.PersonWrapper@8542...|20160610|
|[aa.bb.spark.types.PersonWrapper@8831...|20160610|
|[aa.bb.spark.types.PersonWrapper@3661...|20160610|
...
...
...
+--------------------+--------+
only showing top 20 rows
{code}


>From Hive I can see the table though it looks empty (I know that Spark told me it was
stored in a Spark SQL specific format):
{code:scala}
0: jdbc:hive2://imp1tvhdpedg1.corp.du.ae:1000> select * from model_parquet_astable;
+----------------------------+--+
| model_parquet_astable.col  |
+----------------------------+--+
+----------------------------+--+
No rows selected (0.057 seconds)
{code}



And if I persist the DataFrame as a Parquet file it is working fine:
{code:scala}
scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").save("model_parquet_asTable")
...
...
$ hdfs dfs -ls -R
drwx------   - raulsaez raulsaez          0 2016-10-27 09:27 model_parquet_asTable/date=20160610
-rw-------   3 raulsaez raulsaez      36725 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00000-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      34638 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00001-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      38752 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00002-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      27702 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00003-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      41743 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00004-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      35128 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00005-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      40996 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00006-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      29046 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00007-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
{code}


Do you know if there is a way for persisting a DataFrame with UDT types as a table with partitions
that is also functional in Hive? By now we don't have chances for upgrade to Spark 2.0 in
addition I don't know even if UDT is working fine with Spark 2.0.


Thanks in advance!





was (Author: raul saez):
[~xwu0226] for me is working fine your example with Spark 1.6.1. However it is not working
when we use UDT.

My DataFrame shows:
{code:scala}
scala> model_date.toDF.show
+--------+--------------------+
|    date|               model|
+--------+--------------------+
|20160610|[aa.bb.spark.types.PersonWrapper@8542...|
|20160610|[aa.bb.spark.types.PersonWrapper@8831......|
...
...
+--------+--------------------+
{code}

I have created the table with some specific properties so I can say how is defined the table
and how to parse from PersonType UDT to table schema:
{code:sql}
create table model_orc (`model` struct<personWrapper:struct<id:int,name:string>>)
PARTITIONED BY (`date` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES ('path'='hdfs:///user/raulsaez/model_orc') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs:///user/raulsaez/model_orc'
TBLPROPERTIES('spark.sql.sources.schema.numParts'='1','spark.sql.sources.schema.part.0'='{
\"type\":\"struct\",\"fields\":[{ \"name\":\"personWrapper\",\"type\":{ \"type\":\"udt\",\"class\":\"aa.bb.spark.types.PersonType\",\"pyClass\":null,\"sqlType\":{
\"type\":\"struct\",\"fields\":[{ \"name\":\"id\",\"type\":   \"integer\",\"nullable\":true,\"metadata\":{}
} ,{ \"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{} }] } },\"nullable\":true,\"metadata\":{}
}] }')
{code}

Now we insert data into table:
{code:scala}
scala> hiveContext.sql("insert into model_orc partition(date=20160610) select model,date
from dfJune")
org.apache.spark.sql.AnalysisException: cannot resolve 'cast(model as struct<person:struct<personWrapper:struct<id:int,name:string>>>)'
due to data type mismatch: cannot cast
StructType(StructField(personWrapper,,true)),true)
to
StructType(StructField(person,StructType(StructField(id,IntegerType,true),StructField(name,StringType,true),),true)
{code}
I have the same issue with both Parquet and ORC.



And if I persist the DataFrame as a table with ORC:
{code:scala}
model_date.toDF.write.format("orc").partitionBy("date").saveAsTable("model_orc")
{code}

Or even if I persist it as a ORC file:
{code:scala}
scala> model_date.toDF.write.mode(SaveMode.Append).format("orc").partitionBy("date").save("model_orc_asTable")
{code}

I get the ClassCastException:
{code:scala}
Caused by: java.lang.ClassCastException: aa.bb.spark.types.PersonType cannot be cast to org.apache.spark.sql.types.StructType
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:557)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:590)
        at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:589)
        at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:589)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrapOrcStruct(OrcRelation.scala:128)
        at org.apache.spark.sql.hive.orc.OrcOutputWriter.writeInternal(OrcRelation.scala:139)
        at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:358)
        ... 8 more
{code}


If I persist the DataFrame as a table with Parquet:
{code:scala}
scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").saveAsTable("model_parquet_asTable")
16/10/27 09:39:24 WARN HiveContext$$anon$2: Persisting partitioned data source relation `model_parquet_asTable`
into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input
path(s):
hdfs://dev-nameservice/apps/hive/warehouse/model_parquet_astable
...
...
...
scala> hiveContext.sql("select * from model_parquet_asTable where date=20160610").show
+--------------------+--------+
|               model|    date|
+--------------------+--------+
|[aa.bb.spark.types.PersonWrapper@8542...|20160610|
|[aa.bb.spark.types.PersonWrapper@8831...|20160610|
|[aa.bb.spark.types.PersonWrapper@3661...|20160610|
...
...
...
+--------------------+--------+
only showing top 20 rows
{code}


>From Hive I can see the table though it looks empty (I know that Spark told me it was
stored in a Spark SQL specific format):
{code:scala}
0: jdbc:hive2://imp1tvhdpedg1.corp.du.ae:1000> select * from model_parquet_astable;
+----------------------------+--+
| model_parquet_astable.col  |
+----------------------------+--+
+----------------------------+--+
No rows selected (0.057 seconds)
{code}



And if I persist the DataFrame as a Parquet file it is working fine:
{code:scala}
scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").save("model_parquet_asTable")
...
...
$ hdfs dfs -ls -R
drwx------   - raulsaez raulsaez          0 2016-10-27 09:27 model_parquet_asTable/date=20160610
-rw-------   3 raulsaez raulsaez      36725 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00000-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      34638 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00001-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      38752 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00002-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      27702 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00003-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      41743 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00004-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      35128 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00005-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      40996 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00006-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
-rw-------   3 raulsaez raulsaez      29046 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00007-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet
{code}


Do you know if there is a way for persisting a DataFrame with UDT types as a table with partitions
that is also functional in Hive? By now we don't have chances for upgrade to Spark 2.0 in
addition I don't know even if UDT is working fine with Spark 2.0.


Thanks in advance!




> DataFrame. saveAsTable creates RDD partitions but not Hive partitions
> ---------------------------------------------------------------------
>
>                 Key: SPARK-14927
>                 URL: https://issues.apache.org/jira/browse/SPARK-14927
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.2, 1.6.1
>         Environment: Mac OS X 10.11.4 local
>            Reporter: Sasha Ovsankin
>
> This is a followup to http://stackoverflow.com/questions/31341498/save-spark-dataframe-as-dynamic-partitioned-table-in-hive
. I tried to use suggestions in the answers but couldn't make it to work in Spark 1.6.1
> I am trying to create partitions programmatically from `DataFrame. Here is the relevant
code (adapted from a Spark test):
>     hc.setConf("hive.metastore.warehouse.dir", "tmp/tests")
>     //    hc.setConf("hive.exec.dynamic.partition", "true")
>     //    hc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
>     hc.sql("create database if not exists tmp")
>     hc.sql("drop table if exists tmp.partitiontest1")
>     Seq(2012 -> "a").toDF("year", "val")
>       .write
>       .partitionBy("year")
>       .mode(SaveMode.Append)
>       .saveAsTable("tmp.partitiontest1")
>     hc.sql("show partitions tmp.partitiontest1").show
> Full file is here: https://gist.github.com/SashaOv/7c65f03a51c7e8f9c9e018cd42aa4c4a
> I get the error that the table is not partitioned:
>     ======================
>     HIVE FAILURE OUTPUT
>     ======================
>     SET hive.support.sql11.reserved.keywords=false
>     SET hive.metastore.warehouse.dir=tmp/tests
>     OK
>     OK
>     FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask.
Table tmp.partitiontest1 is not a partitioned table
>     ======================
> It looks like the root cause is that `org.apache.spark.sql.hive.HiveMetastoreCatalog.newSparkSQLSpecificMetastoreTable`
always creates table with empty partitions.
> Any help to move this forward is appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message