spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: how can I dynamic parse json in kafka when using Structured Streaming
Date Tue, 17 Sep 2019 08:13:11 GMT
You can use *from_json* built-in SQL function to parse json.
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-

On Mon, Sep 16, 2019 at 7:39 PM lk_spark <lk_spark@163.com> wrote:

> hi,all :
>     I'm using Structured Streaming to read kafka , the data type is json
> String , I want to parse  it and conver to a datafrme , my code can't pass
> compile , I don't know how to fix it:
>
>
> val lines = messages.selectExpr("CAST(value AS STRING) as value").as[
> String]
>
> val words = lines.map(line => {
>   var json: JValue = null
>   try {
>     json = parse(line)
>   } catch {
>     case ex: Exception => { println(ex.getMessage + " " + line) }
>   }
>   //var result: scala.collection.mutable.Map[String,String] =
> scala.collection.mutable.Map()
>   val jsonObj = json.values.asInstanceOf[Map[String, _]]
>   val valuse = jsonObj.values.toArray
>   val schema = StructType(List())
>   for ((k, v) <- jsonObj){
>     //result += (k -> jsonObj.get(k).toString())
>
>     if(v.isInstanceOf[String]){
>       schema.add(k,StringType)
>     }else if (v.isInstanceOf[Int]){
>       schema.add(k,IntegerType)
>     }/*else if (v.isInstanceOf[Array[String]]){
>       schema.add(k,ArrayType(StringType))
>     }else if (v.isInstanceOf[Map[String,String]]){
>       schema.add(k,MapType(StringType,StringType))
>     }*/
>   }
>   val row =  new GenericRowWithSchema(valuse,schema)
>   row
> })
>
>
> Error:(45, 26) Unable to find encoder for type
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit
> Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is
> needed to store
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in
> a Dataset. Primitive types (Int, String, etc) and Product types (case
> classes) are supported by importing spark.implicits._  Support for
> serializing other types will be added in future releases.
>     val words = lines.map(line => {
>
> Error:(45, 26) not enough arguments for method map: (implicit evidence$6:
> org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
> Unspecified value parameter evidence$6.
>     val words = lines.map(line => {
>
>
>
> 2019-09-17
> ------------------------------
> lk_spark
>

Mime
View raw message