spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Divya Gehlot <divya.htco...@gmail.com>
Subject pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala
Date Fri, 05 Feb 2016 07:41:32 GMT
Hi,
I have two input datasets
First input dataset like as below :

    year,make,model,comment,blank
>     "2012","Tesla","S","No comment",
>     1997,Ford,E350,"Go get one now they are going fast",
>     2015,Chevy,Volt


Second Input dataset :

    TagId,condition
>     1997_cars,year = 1997 and model = 'E350'
>     2012_cars,year=2012 and model ='S'
>     2015_cars ,year=2015 and model = 'Volt'


Now my requirement is read first data set and based on the filtering
condition in second dataset need to tag rows of first input dataset by
introducing a new column TagId to first input data set
so the expected should look like :

    year,make,model,comment,blank,TagId
>     "2012","Tesla","S","No comment",2012_cars
>     1997,Ford,E350,"Go get one now they are going fast",1997_cars
>     2015,Chevy,Volt, ,2015_cars


I tried like :

    val sqlContext = new SQLContext(sc)
>     val carsSchema = StructType(Seq(
>         StructField("year", IntegerType, true),
>         StructField("make", StringType, true),
>         StructField("model", StringType, true),
>         StructField("comment", StringType, true),
>         StructField("blank", StringType, true)))
>
>     val carTagsSchema = StructType(Seq(
>         StructField("TagId", StringType, true),
>         StructField("condition", StringType, true)))
>
>
>     val dfcars =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
>     val dftags =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
>
>     val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
>     val cdtnval = dftags.select("condition")
>     val df2=dfcars.filter(cdtnval)
>     <console>:35: error: overloaded method value filter with alternatives:
>       (conditionExpr: String)org.apache.spark.sql.DataFrame <and>
>       (condition:
> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
>      cannot be applied to (org.apache.spark.sql.DataFrame)
>            val df2=dfcars.filter(cdtnval)


another way :

    val col = dftags.col("TagId")
>     val finaldf = dfcars.withColumn("TagId", col)
>     org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5
> missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project
> [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
>
>     finaldf.write.format("com.databricks.spark.csv").option("header",
> "true").save("/TestDivya/Spark/carswithtags.csv")



Would really appreciate if somebody give me pointers how can I pass the
filter condition(second dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark

Thanks

Mime
View raw message