spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Manu Zhang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-27689) Error to execute hive views with spark
Date Thu, 01 Aug 2019 10:25:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-27689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897948#comment-16897948
] 

Manu Zhang commented on SPARK-27689:
------------------------------------

[~lambda], [~hyukjin.kwon], [~yumwang],

This issue should be fixed by [PR#24960|https://github.com/apache/spark/pull/24960] ([PR#25068|https://github.com/apache/spark/pull/25068]
for branch-2.4 and [PR#25293|https://github.com/apache/spark/pull/25293] for 2.3) where analyzing
View is deferred to optimizer. 

Before that View was analyzed first and JOIN would try to deduplicate conflicting attributes
(since you are joining on the same id_person column) by *replacing* ids of View's plan in
the *right* branch. That's where things went wrong. Take a look at the logical plan under
View
{code:java}
+- View (`schema_p`.`person_product_v`, [id_person#76,id_product#77,country#78,city#79,price#80,start_date#81,end_date#82])
  +- !Project [cast(id_person#103 as int) AS id_person#76, cast(id_product#104 as int) AS
id_product#77, cast(country#105 as string) AS country#78, cast(city#106 as string) AS city#79,
cast(price#107 as decimal(38,8)) AS price#80, cast(start_date#108 as date) AS start_date#81,
cast(end_date#109 as date) AS end_date#82]
    +- Project [cast(id_person#7 as int) AS id_person#0, cast(id_product#8 as int) AS id_product#1,
cast(country#9 as string) AS country#2, cast(city#10 as string) AS city#3, cast(price#11 as
decimal(38,8)) AS price#4, cast(start_date#12 as date) AS start_date#5, cast(end_date#13 as
date) AS end_date#6]
{code}
The input of the node {{!Project}} (e.g. id_person#103) didn't match output of its child (e.g.
id_person#0). That's because id_person#103 was replaced from id_person#0 in the deduplicating
process while id_person#0 could not be replaced since it's not an attribute (check out Alias).
  
 This can be reproduced with a UT like 
{code}
test("SparkSQL failed to resolve attributes with nested self-joins on hive view") { 
  withTable("hive_table") { 
    withView("hive_view", "temp_view1", "temp_view2") { 
       sql("CREATE TABLE hive_table AS SELECT 1 AS id") 
       sql("CREATE VIEW hive_view AS SELECT id FROM hive_table") 
       sql("CREATE TEMPORARY VIEW temp_view1 AS SELECT id FROM hive_view") 
       sql("CREATE TEMPORARY VIEW temp_view2 AS SELECT a.id " + 
          "FROM temp_view1 AS a JOIN temp_view1 AS b ON a.id = b.id") 
       val df = sql("SELECT c.id FROM temp_view1 AS c JOIN temp_view2 AS d ON c.id = d.id")

       checkAnswer(df, Row(1)) 
    } 
  }
}
{code}

> Error to execute hive views with spark
> --------------------------------------
>
>                 Key: SPARK-27689
>                 URL: https://issues.apache.org/jira/browse/SPARK-27689
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.3.3, 2.4.3
>            Reporter: Lambda
>            Priority: Major
>
> I have a python error when I execute the following code using hive views but it works
correctly when I run it with hive tables.
> *Hive databases:*
> {code:java}
> CREATE DATABASE schema_p LOCATION "hdfs:///tmp/schema_p";
> {code}
> *Hive tables:*
> {code:java}
> CREATE TABLE schema_p.product(
>  id_product string,
>  name string,
>  country string,
>  city string,
>  start_date string,
>  end_date string
>  )
>  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
>  STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
>  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
>  LOCATION 'hdfs:///tmp/schema_p/product';
> {code}
> {code:java}
> CREATE TABLE schema_p.person_product(
>  id_person string,
>  id_product string,
>  country string,
>  city string,
>  price string,
>  start_date string,
>  end_date string
>  )
>  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
>  STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
>  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
>  LOCATION 'hdfs:///tmp/schema_p/person_product';
> {code}
> *Hive views:*
> {code:java}
> CREATE VIEW schema_p.product_v AS SELECT CAST(id_product AS INT) AS id_product, name
AS name, country AS country, city AS city, CAST(start_date AS DATE) AS start_date, CAST(end_date
AS DATE) AS end_date FROM schema_p.product;
>  
> CREATE VIEW schema_p.person_product_v AS SELECT CAST(id_person AS INT) AS id_person,
CAST(id_product AS INT) AS id_product, country AS country, city AS city, CAST(price AS DECIMAL(38,8))
AS price, CAST(start_date AS DATE) AS start_date, CAST(end_date AS DATE) AS end_date FROM
schema_p.person_product;
> {code}
> *Code*:
> {code:java}
> def read_tables(sc):
>   in_dict = { 'product': 'product_v', 'person_product': 'person_product_v' }
>   data_dict = {}
>   for n, d in in_dict.iteritems():
>     data_dict[n] = sc.read.table(d)
>   return data_dict
> def get_population(tables, ref_date_str):
>   product = tables['product']
>   person_product = tables['person_product']
>   count_prod =person_product.groupBy('id_product').agg(F.count('id_product').alias('count_prod'))
>   person_product_join = person_product.join(product,'id_product')
>   person_count = person_product_join.join(count_prod,'id_product')
>   final = person_product_join.join(person_count, 'id_person', 'left')
>   return final
> import pyspark.sql.functions as F
> import functools
> from pyspark.sql.functions import col
> from pyspark.sql.functions import add_months, lit, count, coalesce
> spark.sql('use schema_p')
> data_dict = read_tables(spark)
> data_dict
> population = get_population(data_dict, '2019-04-30')
> population.show()
> {code}
> *Error:*
> {code:java}
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File "<stdin>", line 10, in get_population
> File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 931, in join
> jdf = self._jdf.join(other._jdf, on, how)
> File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
line 1160, in __call__
> File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
> raise AnalysisException(s.split(': ', 1)[1], stackTrace)
> pyspark.sql.utils.AnalysisException: u'Resolved attribute(s) id_person#103,start_date#108,id_product#104,end_date#109,price#107,country#105,city#106
missing from price#4,id_product#1,start_date#5,end_date#6,id_person#0,city#3,country#2 in
operator !Project [cast(id_person#103 as int) AS id_person#76, cast(id_product#104 as int)
AS id_product#77, cast(country#105 as string) AS country#78, cast(city#106 as string) AS city#79,
cast(price#107 as decimal(38,8)) AS price#80, cast(start_date#108 as date) AS start_date#81,
cast(end_date#109 as date) AS end_date#82]. Attribute(s) with the same name appear in the
operation: id_person,start_date,id_product,end_date,price,country,city. Please check if the
right attribute(s) are used.;;
> Project [id_person#0, id_product#1, country#2, city#3, price#4, start_date#5, end_date#6,
name#29, country#30, city#31, start_date#32, end_date#33, id_product#104, country#105, city#106,
price#107, start_date#108, end_date#109, name#137, country#138, city#139, start_date#140,
end_date#141, count_prod#61L]
> +- Join LeftOuter, (id_person#0 = id_person#103)
> :- Project [id_product#1, id_person#0, country#2, city#3, price#4, start_date#5, end_date#6,
name#29, country#30, city#31, start_date#32, end_date#33]
> : +- Join Inner, (id_product#1 = id_product#28)
> : :- SubqueryAlias person_product_v
> : : +- View (`schema_p`.`person_product_v`, [id_person#0,id_product#1,country#2,city#3,price#4,start_date#5,end_date#6])
> : : +- Project [cast(id_person#7 as int) AS id_person#0, cast(id_product#8 as int) AS
id_product#1, cast(country#9 as string) AS country#2, cast(city#10 as string) AS city#3, cast(price#11
as decimal(38,8)) AS price#4, cast(start_date#12 as date) AS start_date#5, cast(end_date#13
as date) AS end_date#6]
> : : +- Project [cast(id_person#14 as int) AS id_person#7, cast(id_product#15 as int)
AS id_product#8, country#16 AS country#9, city#17 AS city#10, cast(price#18 as decimal(38,8))
AS price#11, cast(start_date#19 as date) AS start_date#12, cast(end_date#20 as date) AS end_date#13]
> : : +- SubqueryAlias person_product
> : : +- HiveTableRelation `schema_p`.`person_product`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[id_person#14, id_product#15, country#16, city#17, price#18, start_date#19, end_date#20]
> : +- SubqueryAlias product_v
> : +- View (`schema_p`.`product_v`, [id_product#28,name#29,country#30,city#31,start_date#32,end_date#33])
> : +- Project [cast(id_product#34 as int) AS id_product#28, cast(name#35 as string) AS
name#29, cast(country#36 as string) AS country#30, cast(city#37 as string) AS city#31, cast(start_date#38
as date) AS start_date#32, cast(end_date#39 as date) AS end_date#33]
> : +- Project [cast(id_product#40 as int) AS id_product#34, name#41 AS name#35, country#42
AS country#36, city#43 AS city#37, cast(start_date#44 as date) AS start_date#38, cast(end_date#45
as date) AS end_date#39]
> : +- SubqueryAlias product
> : +- HiveTableRelation `schema_p`.`product`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[id_product#40, name#41, country#42, city#43, start_date#44, end_date#45]
> +- Project [id_product#104, id_person#103, country#105, city#106, price#107, start_date#108,
end_date#109, name#137, country#138, city#139, start_date#140, end_date#141, count_prod#61L]
> +- Join Inner, (id_product#104 = id_product#77)
> :- Project [id_product#104, id_person#103, country#105, city#106, price#107, start_date#108,
end_date#109, name#137, country#138, city#139, start_date#140, end_date#141]
> : +- Join Inner, (id_product#104 = id_product#136)
> : :- SubqueryAlias person_product_v
> : : +- View (`schema_p`.`person_product_v`, [id_person#103,id_product#104,country#105,city#106,price#107,start_date#108,end_date#109])
> : : +- Project [cast(id_person#0 as int) AS id_person#103, cast(id_product#1 as int)
AS id_product#104, cast(country#2 as string) AS country#105, cast(city#3 as string) AS city#106,
cast(price#4 as decimal(38,8)) AS price#107, cast(start_date#5 as date) AS start_date#108,
cast(end_date#6 as date) AS end_date#109]
> : : +- Project [cast(id_person#7 as int) AS id_person#0, cast(id_product#8 as int) AS
id_product#1, cast(country#9 as string) AS country#2, cast(city#10 as string) AS city#3, cast(price#11
as decimal(38,8)) AS price#4, cast(start_date#12 as date) AS start_date#5, cast(end_date#13
as date) AS end_date#6]
> : : +- Project [cast(id_person#14 as int) AS id_person#7, cast(id_product#15 as int)
AS id_product#8, country#16 AS country#9, city#17 AS city#10, cast(price#18 as decimal(38,8))
AS price#11, cast(start_date#19 as date) AS start_date#12, cast(end_date#20 as date) AS end_date#13]
> : : +- SubqueryAlias person_product
> : : +- HiveTableRelation `schema_p`.`person_product`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[id_person#14, id_product#15, country#16, city#17, price#18, start_date#19, end_date#20]
> : +- SubqueryAlias product_v
> : +- View (`schema_p`.`product_v`, [id_product#136,name#137,country#138,city#139,start_date#140,end_date#141])
> : +- Project [cast(id_product#28 as int) AS id_product#136, cast(name#29 as string) AS
name#137, cast(country#30 as string) AS country#138, cast(city#31 as string) AS city#139,
cast(start_date#32 as date) AS start_date#140, cast(end_date#33 as date) AS end_date#141]
> : +- Project [cast(id_product#34 as int) AS id_product#28, cast(name#35 as string) AS
name#29, cast(country#36 as string) AS country#30, cast(city#37 as string) AS city#31, cast(start_date#38
as date) AS start_date#32, cast(end_date#39 as date) AS end_date#33]
> : +- Project [cast(id_product#40 as int) AS id_product#34, name#41 AS name#35, country#42
AS country#36, city#43 AS city#37, cast(start_date#44 as date) AS start_date#38, cast(end_date#45
as date) AS end_date#39]
> : +- SubqueryAlias product
> : +- HiveTableRelation `schema_p`.`product`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[id_product#40, name#41, country#42, city#43, start_date#44, end_date#45]
> +- Aggregate [id_product#77], [id_product#77, count(id_product#77) AS count_prod#61L]
> +- SubqueryAlias person_product_v
> +- View (`schema_p`.`person_product_v`, [id_person#76,id_product#77,country#78,city#79,price#80,start_date#81,end_date#82])
> +- !Project [cast(id_person#103 as int) AS id_person#76, cast(id_product#104 as int)
AS id_product#77, cast(country#105 as string) AS country#78, cast(city#106 as string) AS city#79,
cast(price#107 as decimal(38,8)) AS price#80, cast(start_date#108 as date) AS start_date#81,
cast(end_date#109 as date) AS end_date#82]
> +- Project [cast(id_person#7 as int) AS id_person#0, cast(id_product#8 as int) AS id_product#1,
cast(country#9 as string) AS country#2, cast(city#10 as string) AS city#3, cast(price#11 as
decimal(38,8)) AS price#4, cast(start_date#12 as date) AS start_date#5, cast(end_date#13 as
date) AS end_date#6]
> +- Project [cast(id_person#14 as int) AS id_person#7, cast(id_product#15 as int) AS id_product#8,
country#16 AS country#9, city#17 AS city#10, cast(price#18 as decimal(38,8)) AS price#11,
cast(start_date#19 as date) AS start_date#12, cast(end_date#20 as date) AS end_date#13]
> +- SubqueryAlias person_product
> +- HiveTableRelation `schema_p`.`person_product`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
[id_person#14, id_product#15, country#16, city#17, price#18, start_date#19, end_date#20]
> '{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message