spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lohith Samaga M <>
Subject RE: pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala
Date Fri, 05 Feb 2016 09:56:46 GMT
                If you can also format the condition file as a csv file similar to the main
file, then you can join the two dataframes and select only required columns.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga

From: Divya Gehlot []
Sent: Friday, February 05, 2016 13.12
To: user @spark
Subject: pass one dataframe column value to another dataframe filter expression + Spark 1.5
+ scala

I have two input datasets
First input dataset like as below :

    "2012","Tesla","S","No comment",
    1997,Ford,E350,"Go get one now they are going fast",

Second Input dataset :

    1997_cars,year = 1997 and model = 'E350'
    2012_cars,year=2012 and model ='S'
    2015_cars ,year=2015 and model = 'Volt'

Now my requirement is read first data set and based on the filtering condition in second dataset
need to tag rows of first input dataset by introducing a new column TagId to first input data
so the expected should look like :

    "2012","Tesla","S","No comment",2012_cars
    1997,Ford,E350,"Go get one now they are going fast",1997_cars
    2015,Chevy,Volt, ,2015_cars

I tried like :

    val sqlContext = new SQLContext(sc)
    val carsSchema = StructType(Seq(
        StructField("year", IntegerType, true),
        StructField("make", StringType, true),
        StructField("model", StringType, true),
        StructField("comment", StringType, true),
        StructField("blank", StringType, true)))

    val carTagsSchema = StructType(Seq(
        StructField("TagId", StringType, true),
        StructField("condition", StringType, true)))

    val dfcars ="com.databricks.spark.csv").option("header", "true")
    val dftags ="com.databricks.spark.csv").option("header", "true")

    val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
    val cdtnval ="condition")
    val df2=dfcars.filter(cdtnval)
    <console>:35: error: overloaded method value filter with alternatives:
      (conditionExpr: String)org.apache.spark.sql.DataFrame <and>
      (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
     cannot be applied to (org.apache.spark.sql.DataFrame)
           val df2=dfcars.filter(cdtnval)

another way :

    val col = dftags.col("TagId")
    val finaldf = dfcars.withColumn("TagId", col)
    org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 missing from comment#3,blank#4,model#2,make#1,year#0
in operator !Project [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];

    finaldf.write.format("com.databricks.spark.csv").option("header", "true").save("/TestDivya/Spark/carswithtags.csv")

Would really appreciate if somebody give me pointers how can I pass the filter condition(second
dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark

Information transmitted by this e-mail is proprietary to Mphasis, its associated companies
and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may contain information
that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended recipient or it appears
that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination of this information
in any manner is strictly 
prohibited. In such cases, please notify us immediately at and delete
this mail from your records.
View raw message