spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Gregg <jonrgr...@gmail.com>
Subject Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.
Date Sun, 20 Nov 2016 21:10:23 GMT
In these cases it might help to just flatten the DataFrame.  Here's a
helper function from the tutorial (scroll down to the "Flattening" header:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/02%20Introduction%20to%20DataFrames%20-%20scala.html



On Sun, Nov 20, 2016 at 1:24 PM, pandees waran <pandeesh@gmail.com> wrote:

> have you tried using "." access method?
>
> e.g:
> ds1.select("name","addresses[0].element.city")
>
> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> The following my dataframe schema
>>
>>     root
>>      |-- name: string (nullable = true)
>>      |-- addresses: array (nullable = true)
>>      |    |-- element: struct (containsNull = true)
>>      |    |    |-- street: string (nullable = true)
>>      |    |    |-- city: string (nullable = true)
>>
>> I want to output name and city. The following is my spark streaming app
>> which outputs name and addresses, but I want name and cities in the output.
>>
>>     object PersonConsumer {
>>       import org.apache.spark.sql.{SQLContext, SparkSession}
>>       import com.example.protos.demo._
>>
>>       def main(args : Array[String]) {
>>
>>         val spark = SparkSession.builder.
>>           master("local")
>>           .appName("spark session example")
>>           .getOrCreate()
>>
>>         import spark.implicits._
>>
>>         val ds1 = spark.readStream.format("kafka").
>>           option("kafka.bootstrap.servers","localhost:9092").
>>           option("subscribe","person").load()
>>
>>         val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
>> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>>
>>         ds2.printSchema()
>>
>>         val query = ds2.writeStream
>>           .outputMode("append")
>>           .format("console")
>>           .start()
>>
>>         query.awaitTermination()
>>       }
>>     }
>>
>> Appreciate your help. Thanks.
>>
>
>
>
> --
> Thanks,
> Pandeeswaran
>

Mime
View raw message