spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Terry Siu <Terry....@smartfocus.com>
Subject Re: SparkSQL - TreeNodeException for unresolved attributes
Date Mon, 20 Oct 2014 19:22:16 GMT
Hi Michael,

Thanks again for the reply. Was hoping it was something I was doing wrong in 1.1.0, but I’ll
try master.

Thanks,
-Terry

From: Michael Armbrust <michael@databricks.com<mailto:michael@databricks.com>>
Date: Monday, October 20, 2014 at 12:11 PM
To: Terry Siu <terry.siu@smartfocus.com<mailto:terry.siu@smartfocus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: SparkSQL - TreeNodeException for unresolved attributes

Have you tried this on master?  There were several problems with resolution of complex queries
that were registered as tables in the 1.1.0 release.

On Mon, Oct 20, 2014 at 10:33 AM, Terry Siu <Terry.Siu@smartfocus.com<mailto:Terry.Siu@smartfocus.com>>
wrote:
Hi all,

I’m getting a TreeNodeException for unresolved attributes when I do a simple select from
a schemaRDD generated by a join in Spark 1.1.0. A little background first. I am using a HiveContext
(against Hive 0.12) to grab two tables, join them, and then perform multiple INSERT-SELECT
with GROUP BY to write back out to a Hive rollup table that has two partitions. This task
is an effort to simulate the unsupported GROUPING SETS functionality in SparkSQL.

In my first attempt, I got really close using  SchemaRDD.groupBy until I realized that SchemaRDD.insertTo
API does not support partitioned tables yet. This prompted my second attempt to pass in SQL
to the HiveContext.sql API instead.

Here’s a rundown of the commands I executed on the spark-shell:


val hc = new HiveContext(sc)

hc.setConf("spark.sql.hive.convertMetastoreParquet", "true”)

hc.setConf("spark.sql.parquet.compression.codec", "snappy”)


// For implicit conversions to Expression

val sqlContext = new SQLContext(sc)

import sqlContext._


val segCusts = hc.hql(“select …”)

val segTxns = hc.hql(“select …”)


val sc = segCusts.as('sc)

val st = segTxns.as(‘st)


// Join the segCusts and segTxns tables

val rup = sc.join(st, Inner, Some("sc.segcustomerid".attr==="st.customerid".attr))

rup.registerAsTable(“rupbrand”)



If I do a printSchema on the rup, I get:

root

 |-- segcustomerid: string (nullable = true)

 |-- sales: double (nullable = false)

 |-- tx_count: long (nullable = false)

 |-- storeid: string (nullable = true)

 |-- transdate: long (nullable = true)

 |-- transdate_ts: string (nullable = true)

 |-- transdate_dt: string (nullable = true)

 |-- unitprice: double (nullable = true)

 |-- translineitem: string (nullable = true)

 |-- offerid: string (nullable = true)

 |-- customerid: string (nullable = true)

 |-- customerkey: string (nullable = true)

 |-- sku: string (nullable = true)

 |-- quantity: double (nullable = true)

 |-- returnquantity: double (nullable = true)

 |-- channel: string (nullable = true)

 |-- unitcost: double (nullable = true)

 |-- transid: string (nullable = true)

 |-- productid: string (nullable = true)

 |-- id: string (nullable = true)

 |-- campaign_campaigncost: double (nullable = true)

 |-- campaign_begindate: long (nullable = true)

 |-- campaign_begindate_ts: string (nullable = true)

 |-- campaign_begindate_dt: string (nullable = true)

 |-- campaign_enddate: long (nullable = true)

 |-- campaign_enddate_ts: string (nullable = true)

 |-- campaign_enddate_dt: string (nullable = true)

 |-- campaign_campaigntitle: string (nullable = true)

 |-- campaign_campaignname: string (nullable = true)

 |-- campaign_id: string (nullable = true)

 |-- product_categoryid: string (nullable = true)

 |-- product_company: string (nullable = true)

 |-- product_brandname: string (nullable = true)

 |-- product_vendorid: string (nullable = true)

 |-- product_color: string (nullable = true)

 |-- product_brandid: string (nullable = true)

 |-- product_description: string (nullable = true)

 |-- product_size: string (nullable = true)

 |-- product_subcategoryid: string (nullable = true)

 |-- product_departmentid: string (nullable = true)

 |-- product_productname: string (nullable = true)

 |-- product_categoryname: string (nullable = true)

 |-- product_vendorname: string (nullable = true)

 |-- product_sku: string (nullable = true)

 |-- product_subcategoryname: string (nullable = true)

 |-- product_status: string (nullable = true)

 |-- product_departmentname: string (nullable = true)

 |-- product_style: string (nullable = true)

 |-- product_id: string (nullable = true)

 |-- customer_lastname: string (nullable = true)

 |-- customer_familystatus: string (nullable = true)

 |-- customer_customertype: string (nullable = true)

 |-- customer_city: string (nullable = true)

 |-- customer_country: string (nullable = true)

 |-- customer_state: string (nullable = true)

 |-- customer_region: string (nullable = true)

 |-- customer_customergroup: string (nullable = true)

 |-- customer_maritalstatus: string (nullable = true)

 |-- customer_agerange: string (nullable = true)

 |-- customer_zip: string (nullable = true)

 |-- customer_age: double (nullable = true)

 |-- customer_address2: string (nullable = true)

 |-- customer_incomerange: string (nullable = true)

 |-- customer_gender: string (nullable = true)

 |-- customer_customerkey: string (nullable = true)

 |-- customer_address1: string (nullable = true)

 |-- customer_email: string (nullable = true)

 |-- customer_education: string (nullable = true)

 |-- customer_birthdate: long (nullable = true)

 |-- customer_birthdate_ts: string (nullable = true)

 |-- customer_birthdate_dt: string (nullable = true)

 |-- customer_id: string (nullable = true)

 |-- customer_firstname: string (nullable = true)

 |-- transnum: long (nullable = true)

 |-- transmonth: string (nullable = true)


Nothing but a flat schema with no duplicated column names. I then execute:


hc.sql(“select transid from rupbrand”)


and I get:


scala> hc.sql("select transid from rupbrand")

14/10/20 10:01:44 INFO ParseDriver: Parsing command: select transid from rupbrand

14/10/20 10:01:44 INFO ParseDriver: Parse Completed

res18: org.apache.spark.sql.SchemaRDD =

SchemaRDD[121] at RDD at SchemaRDD.scala:103

== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'transid,
tree:

Project ['transid]

 LowerCaseSchema

  Subquery rupbrand

   Join Inner, Some(('sc.segcustomerid = 'st.customerid))

    Subquery sc

     Filter CAST((COUNT(DISTINCT 't.transid) > 0), BooleanType)

      Aggregate ['c.customerid], ['c.customerId AS segcustomerid#5,SUM('t.sales) AS sales#6,COUNT(DISTINCT
't.transid) AS tx_count#7]

       Filter 'c.gender IN (Male)

        Join Inner, Some(('c.customerid = 't.customerid))

         Subquery t

          Aggregate [customerid#3259,transid#3266], ['d.customerId AS customerid#1,transid#3266
AS transid#2,SUM((quantity#3262 * …


I’m wondering if the query for the column in my join table is somehow conflicting with the
columns from the two tables on which the join table is constructed from as I see the plan,
a breakdown of various pieces from the queries on my two source tables.


Help?


Thanks,

-Terry




Mime
View raw message