spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cyril Scetbon <cyril.scet...@free.fr>
Subject Re: Using data frames to join separate RDDs in spark streaming
Date Wed, 01 Jun 2016 18:05:58 GMT
It seems that to join a DStream with a RDD I can use :

mgs.transform(rdd => rdd.join(rdd1))

or

mgs.foreachRDD(rdd => rdd.join(rdd1))

But, I can't see why rdd1.toDF("id","aid") really causes SPARK-5063

 
> On Jun 1, 2016, at 12:00, Cyril Scetbon <cyril.scetbon@free.fr> wrote:
> 
> Hi guys,
> 
> I have a 2 input data streams that I want to join using Dataframes and unfortunately
I get the message produced by https://issues.apache.org/jira/browse/SPARK-5063 as I can't
reference rdd1  in (2) :
> 
> (1)
> val rdd1 = sc.esRDD(es_resource.toLowerCase, query)
>                     .map(r => (r._1, r._2))
> 
> (2)
> mgs.map(x => x._1)
>       .foreachRDD { rdd =>
>         val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
>         import sqlContext.implicits._
> 
>         val df_aids = rdd.toDF("id")
> 
>         val df = rdd1.toDF("id", "aid")
> 
>         df.select(explode(df("aid")).as("aid"), df("id"))
>            .join(df_aids, $"aid" === df_aids("id"))
>            .select(df("id"), df_aids("id"))
>           .....
>       }
> 
> Is there a way to still use Dataframes to do it or I need to do everything using RDDs
join only ?
> And If I need to use only RDDs join, how to do it ? as I have a RDD (rdd1) and a DStream
(mgs) ?
> 
> Thanks
> -- 
> Cyril SCETBON
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message