spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Helena Edelson <helena.edel...@datastax.com>
Subject Re: How to parse Json formatted Kafka message in spark streaming
Date Thu, 05 Mar 2015 15:02:47 GMT
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for
the mapping:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")


HELENA EDELSON
Senior Software Engineer,  DSE Analytics 

  

On Mar 5, 2015, at 9:33 AM, Ted Yu <yuzhihong@gmail.com> wrote:

> Cui:
> You can check messages.partitions.size to determine whether messages is an empty RDD.
> 
> Cheers
> 
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
> When you use KafkaUtils.createStream with StringDecoders, it will return String objects
inside your messages stream. To access the elements from the json, you could do something
like the following:
> 
> 
>    val mapStream = messages.map(x=> {
>       val mapper = new ObjectMapper() with ScalaObjectMapper
>       mapper.registerModule(DefaultScalaModule)
> 
>       mapper.readValue[Map[String,Any]](x).get("time")
>     })
> 
>   
> 
> Thanks
> Best Regards
> 
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui.lin@hds.com> wrote:
> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have
two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns in the
json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin
> 
> 


Mime
View raw message