spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shyla deshpande <deshpandesh...@gmail.com>
Subject Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.
Date Sun, 20 Nov 2016 21:44:53 GMT
Thanks Jon, great Learning resource.
Thanks Pandees,  addresses[0].city would work , but I want all the cities
not just from addresses[0].
Finally, I wrote the following function to get the cities.

    def getCities(addresses: Seq[Address]) : String = {
      var cities:String = ""
      if (addresses.size > 0) {
        cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
//        cities = addresses.foldLeft("")((str,addr) => str  +
addr.city.getOrElse(""))
      }
      cities
    }

Great help. Thanks again


On Sun, Nov 20, 2016 at 1:10 PM, Jon Gregg <jonrgregg@gmail.com> wrote:

> In these cases it might help to just flatten the DataFrame.  Here's a
> helper function from the tutorial (scroll down to the "Flattening" header:
> https://docs.cloud.databricks.com/docs/latest/databricks_
> guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/
> 02%20Introduction%20to%20DataFrames%20-%20scala.html
>
>
> On Sun, Nov 20, 2016 at 1:24 PM, pandees waran <pandeesh@gmail.com> wrote:
>
>> have you tried using "." access method?
>>
>> e.g:
>> ds1.select("name","addresses[0].element.city")
>>
>> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande <
>> deshpandeshyla@gmail.com> wrote:
>>
>>> The following my dataframe schema
>>>
>>>     root
>>>      |-- name: string (nullable = true)
>>>      |-- addresses: array (nullable = true)
>>>      |    |-- element: struct (containsNull = true)
>>>      |    |    |-- street: string (nullable = true)
>>>      |    |    |-- city: string (nullable = true)
>>>
>>> I want to output name and city. The following is my spark streaming app
>>> which outputs name and addresses, but I want name and cities in the output.
>>>
>>>     object PersonConsumer {
>>>       import org.apache.spark.sql.{SQLContext, SparkSession}
>>>       import com.example.protos.demo._
>>>
>>>       def main(args : Array[String]) {
>>>
>>>         val spark = SparkSession.builder.
>>>           master("local")
>>>           .appName("spark session example")
>>>           .getOrCreate()
>>>
>>>         import spark.implicits._
>>>
>>>         val ds1 = spark.readStream.format("kafka").
>>>           option("kafka.bootstrap.servers","localhost:9092").
>>>           option("subscribe","person").load()
>>>
>>>         val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
>>> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>>>
>>>         ds2.printSchema()
>>>
>>>         val query = ds2.writeStream
>>>           .outputMode("append")
>>>           .format("console")
>>>           .start()
>>>
>>>         query.awaitTermination()
>>>       }
>>>     }
>>>
>>> Appreciate your help. Thanks.
>>>
>>
>>
>>
>> --
>> Thanks,
>> Pandeeswaran
>>
>
>

Mime
View raw message