spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From YaoPau <jonrgr...@gmail.com>
Subject Joining DStream with static file
Date Thu, 20 Nov 2014 01:13:21 GMT
Here is my attempt:

val sparkConf = new SparkConf().setAppName("LogCounter")
val ssc =  new StreamingContext(sparkConf, Seconds(2))

val sc = new SparkContext()
val geoData = sc.textFile("data/geoRegion.csv")
            .map(_.split(','))
            .map(line => (line(0), (line(1),line(2),line(3),line(4))))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details
removed for brevity
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))

This is very wrong.  I have a feeling I should be broadcasting geoData
instead of reading it in with each task (it's a 100MB file), but I'm not
sure where to put the code that maps from the .csv to the final geoData rdd.

Also I'm not sure if geoData is even defined correctly (maybe it should use
ssc instead of sc?).  Please advise.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message