spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..
Date Tue, 22 Nov 2016 01:00:48 GMT
You could also do this with Datasets, which will probably be a little more
efficient (since you are telling us you only care about one column)

ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom)

On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande <deshpandeshyla@gmail.com>
wrote:

> Hello everyone,
>  The following code works ...
>
> def main(args : Array[String]) {
>
>   val spark = SparkSession.builder.
>     master("local")
>     .appName("spark session example")
>     .getOrCreate()
>
>   import spark.implicits._
>
>   val ds1 = spark.readStream.format("kafka").
>     option("kafka.bootstrap.servers","localhost:9092").
>     option("subscribe","student").load()
>
>   val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
>
>   val query = ds2.writeStream
>     .outputMode("append")
>     .format("console")
>     .start()
>
>   query.awaitTermination()
>
> }
>
>
> On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <
> deshpandeshyla@gmail.com> wrote:
>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val dframe1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","student").load()
>>
>> *How do I deserialize the value column from dataframe1 *
>>
>> *which is Array[Byte] to Student object using Student.parseFrom..???*
>>
>> *Please help.*
>>
>> *Thanks.*
>>
>>
>>
>> // Stream of votes from Kafka as bytesval votesAsBytes = KafkaUtils.createDirectStream[String,
Array[Byte]](
>>   ssc, LocationStrategies.PreferConsistent,
>>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), kafkaParams))
>> // Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
>>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>>     Vote.parseFrom(cr.value())}
>>
>>
>

Mime
View raw message