spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: How to parse Json formatted Kafka message in spark streaming
Date Fri, 06 Mar 2015 01:13:34 GMT
See following thread for 1.3.0 release:
http://search-hadoop.com/m/JW1q5hV8c4

Looks like the release is around the corner.

On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin <cui.lin@hds.com> wrote:

>   Hi, Ted,
>
>  Thanks for your reply. I noticed from the below link partitions.size
> will not work for checking empty RDD in streams. It seems that the problem
> can be solved in spark 1.3 which is no way to download at this time?
>
>  https://issues.apache.org/jira/browse/SPARK-5270
>  Best regards,
>
>  Cui Lin
>
>   From: Ted Yu <yuzhihong@gmail.com>
> Date: Thursday, March 5, 2015 at 6:33 AM
> To: Akhil Das <akhil@sigmoidanalytics.com>
> Cc: Cui Lin <Cui.Lin@hds.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to parse Json formatted Kafka message in spark streaming
>
>   Cui:
> You can check messages.partitions.size to determine whether messages is
> an empty RDD.
>
>  Cheers
>
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>
>>  When you use KafkaUtils.createStream with StringDecoders, it will
>> return String objects inside your messages stream. To access the elements
>> from the json, you could do something like the following:
>>
>>
>>     val mapStream = messages.map(x=> {
>>        val mapper = new ObjectMapper() with ScalaObjectMapper
>>       mapper.registerModule(DefaultScalaModule)
>>
>>        mapper.readValue[Map[String,Any]](x)*.get("time")*
>>     })
>>
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui.lin@hds.com> wrote:
>>
>>>   Friends,
>>>
>>>   I'm trying to parse json formatted Kafka messages and then send back
>>> to cassandra.I have two problems:
>>>
>>>    1. I got the exception below. How to check an empty RDD?
>>>
>>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>>> empty collection
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at scala.Option.getOrElse(Option.scala:120)
>>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>>
>>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>>>
>>> messages.foreachRDD { rdd =>
>>>   val message:RDD[String] = rdd.map { y => y._2 }
>>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>>   sqlContext.sql("SELECT time,To FROM tempTable")
>>>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key",
"msg"))
>>> }
>>>
>>>
>>>  2. how to get all column names from json messages? I have hundreds of
>>> columns in the json formatted message.
>>>
>>>  Thanks for your help!
>>>
>>>
>>>
>>>
>>>  Best regards,
>>>
>>>  Cui Lin
>>>
>>
>>
>

Mime
View raw message